diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..d838934
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1,7 @@
+*.jar
+
+project/project
+project/target
+target
+
+*.crc
diff --git a/LICENSE b/LICENSE
new file mode 100644
index 0000000..8dada3e
--- /dev/null
+++ b/LICENSE
@@ -0,0 +1,201 @@
+ Apache License
+ Version 2.0, January 2004
+ http://www.apache.org/licenses/
+
+ TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
+
+ 1. Definitions.
+
+ "License" shall mean the terms and conditions for use, reproduction,
+ and distribution as defined by Sections 1 through 9 of this document.
+
+ "Licensor" shall mean the copyright owner or entity authorized by
+ the copyright owner that is granting the License.
+
+ "Legal Entity" shall mean the union of the acting entity and all
+ other entities that control, are controlled by, or are under common
+ control with that entity. For the purposes of this definition,
+ "control" means (i) the power, direct or indirect, to cause the
+ direction or management of such entity, whether by contract or
+ otherwise, or (ii) ownership of fifty percent (50%) or more of the
+ outstanding shares, or (iii) beneficial ownership of such entity.
+
+ "You" (or "Your") shall mean an individual or Legal Entity
+ exercising permissions granted by this License.
+
+ "Source" form shall mean the preferred form for making modifications,
+ including but not limited to software source code, documentation
+ source, and configuration files.
+
+ "Object" form shall mean any form resulting from mechanical
+ transformation or translation of a Source form, including but
+ not limited to compiled object code, generated documentation,
+ and conversions to other media types.
+
+ "Work" shall mean the work of authorship, whether in Source or
+ Object form, made available under the License, as indicated by a
+ copyright notice that is included in or attached to the work
+ (an example is provided in the Appendix below).
+
+ "Derivative Works" shall mean any work, whether in Source or Object
+ form, that is based on (or derived from) the Work and for which the
+ editorial revisions, annotations, elaborations, or other modifications
+ represent, as a whole, an original work of authorship. For the purposes
+ of this License, Derivative Works shall not include works that remain
+ separable from, or merely link (or bind by name) to the interfaces of,
+ the Work and Derivative Works thereof.
+
+ "Contribution" shall mean any work of authorship, including
+ the original version of the Work and any modifications or additions
+ to that Work or Derivative Works thereof, that is intentionally
+ submitted to Licensor for inclusion in the Work by the copyright owner
+ or by an individual or Legal Entity authorized to submit on behalf of
+ the copyright owner. For the purposes of this definition, "submitted"
+ means any form of electronic, verbal, or written communication sent
+ to the Licensor or its representatives, including but not limited to
+ communication on electronic mailing lists, source code control systems,
+ and issue tracking systems that are managed by, or on behalf of, the
+ Licensor for the purpose of discussing and improving the Work, but
+ excluding communication that is conspicuously marked or otherwise
+ designated in writing by the copyright owner as "Not a Contribution."
+
+ "Contributor" shall mean Licensor and any individual or Legal Entity
+ on behalf of whom a Contribution has been received by Licensor and
+ subsequently incorporated within the Work.
+
+ 2. Grant of Copyright License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ copyright license to reproduce, prepare Derivative Works of,
+ publicly display, publicly perform, sublicense, and distribute the
+ Work and such Derivative Works in Source or Object form.
+
+ 3. Grant of Patent License. Subject to the terms and conditions of
+ this License, each Contributor hereby grants to You a perpetual,
+ worldwide, non-exclusive, no-charge, royalty-free, irrevocable
+ (except as stated in this section) patent license to make, have made,
+ use, offer to sell, sell, import, and otherwise transfer the Work,
+ where such license applies only to those patent claims licensable
+ by such Contributor that are necessarily infringed by their
+ Contribution(s) alone or by combination of their Contribution(s)
+ with the Work to which such Contribution(s) was submitted. If You
+ institute patent litigation against any entity (including a
+ cross-claim or counterclaim in a lawsuit) alleging that the Work
+ or a Contribution incorporated within the Work constitutes direct
+ or contributory patent infringement, then any patent licenses
+ granted to You under this License for that Work shall terminate
+ as of the date such litigation is filed.
+
+ 4. Redistribution. You may reproduce and distribute copies of the
+ Work or Derivative Works thereof in any medium, with or without
+ modifications, and in Source or Object form, provided that You
+ meet the following conditions:
+
+ (a) You must give any other recipients of the Work or
+ Derivative Works a copy of this License; and
+
+ (b) You must cause any modified files to carry prominent notices
+ stating that You changed the files; and
+
+ (c) You must retain, in the Source form of any Derivative Works
+ that You distribute, all copyright, patent, trademark, and
+ attribution notices from the Source form of the Work,
+ excluding those notices that do not pertain to any part of
+ the Derivative Works; and
+
+ (d) If the Work includes a "NOTICE" text file as part of its
+ distribution, then any Derivative Works that You distribute must
+ include a readable copy of the attribution notices contained
+ within such NOTICE file, excluding those notices that do not
+ pertain to any part of the Derivative Works, in at least one
+ of the following places: within a NOTICE text file distributed
+ as part of the Derivative Works; within the Source form or
+ documentation, if provided along with the Derivative Works; or,
+ within a display generated by the Derivative Works, if and
+ wherever such third-party notices normally appear. The contents
+ of the NOTICE file are for informational purposes only and
+ do not modify the License. You may add Your own attribution
+ notices within Derivative Works that You distribute, alongside
+ or as an addendum to the NOTICE text from the Work, provided
+ that such additional attribution notices cannot be construed
+ as modifying the License.
+
+ You may add Your own copyright statement to Your modifications and
+ may provide additional or different license terms and conditions
+ for use, reproduction, or distribution of Your modifications, or
+ for any such Derivative Works as a whole, provided Your use,
+ reproduction, and distribution of the Work otherwise complies with
+ the conditions stated in this License.
+
+ 5. Submission of Contributions. Unless You explicitly state otherwise,
+ any Contribution intentionally submitted for inclusion in the Work
+ by You to the Licensor shall be under the terms and conditions of
+ this License, without any additional terms or conditions.
+ Notwithstanding the above, nothing herein shall supersede or modify
+ the terms of any separate license agreement you may have executed
+ with Licensor regarding such Contributions.
+
+ 6. Trademarks. This License does not grant permission to use the trade
+ names, trademarks, service marks, or product names of the Licensor,
+ except as required for reasonable and customary use in describing the
+ origin of the Work and reproducing the content of the NOTICE file.
+
+ 7. Disclaimer of Warranty. Unless required by applicable law or
+ agreed to in writing, Licensor provides the Work (and each
+ Contributor provides its Contributions) on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ implied, including, without limitation, any warranties or conditions
+ of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
+ PARTICULAR PURPOSE. You are solely responsible for determining the
+ appropriateness of using or redistributing the Work and assume any
+ risks associated with Your exercise of permissions under this License.
+
+ 8. Limitation of Liability. In no event and under no legal theory,
+ whether in tort (including negligence), contract, or otherwise,
+ unless required by applicable law (such as deliberate and grossly
+ negligent acts) or agreed to in writing, shall any Contributor be
+ liable to You for damages, including any direct, indirect, special,
+ incidental, or consequential damages of any character arising as a
+ result of this License or out of the use or inability to use the
+ Work (including but not limited to damages for loss of goodwill,
+ work stoppage, computer failure or malfunction, or any and all
+ other commercial damages or losses), even if such Contributor
+ has been advised of the possibility of such damages.
+
+ 9. Accepting Warranty or Additional Liability. While redistributing
+ the Work or Derivative Works thereof, You may choose to offer,
+ and charge a fee for, acceptance of support, warranty, indemnity,
+ or other liability obligations and/or rights consistent with this
+ License. However, in accepting such obligations, You may act only
+ on Your own behalf and on Your sole responsibility, not on behalf
+ of any other Contributor, and only if You agree to indemnify,
+ defend, and hold each Contributor harmless for any liability
+ incurred by, or claims asserted against, such Contributor by reason
+ of your accepting any such warranty or additional liability.
+
+ END OF TERMS AND CONDITIONS
+
+ APPENDIX: How to apply the Apache License to your work.
+
+ To apply the Apache License to your work, attach the following
+ boilerplate notice, with the fields enclosed by brackets "{}"
+ replaced with your own identifying information. (Don't include
+ the brackets!) The text should be enclosed in the appropriate
+ comment syntax for the file format. We also recommend that a
+ file or class name and description of purpose be included on the
+ same "printed page" as the copyright notice for easier
+ identification within third-party archives.
+
+ Copyright {yyyy} {name of copyright owner}
+
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
diff --git a/README.md b/README.md
new file mode 100644
index 0000000..47ef69e
--- /dev/null
+++ b/README.md
@@ -0,0 +1,119 @@
+## Overview
+
+
+Version: 1.0.0
+
+API Scaladoc: [SparkHelper](http://xavierguihot.github.io/spark_helper/#com.spark_helper.SparkHelper)
+
+This library contains a bunch of low-level basic methods for data processing
+with Scala Spark. This is a bunch of 5 modules:
+
+* [HdfsHelper](http://xavierguihot.github.io/spark_helper/#com.spark_helper.HdfsHelper): Wrapper around apache Hadoop FileSystem API ([org.apache.hadoop.fs.FileSystem](https://hadoop.apache.org/docs/r2.6.1/api/org/apache/hadoop/fs/FileSystem.html)) for file manipulations on hdfs.
+* [SparkHelper](http://xavierguihot.github.io/spark_helper/#com.spark_helper.SparkHelper): Hdfs file manipulations through the Spark API.
+* [DateHelper](http://xavierguihot.github.io/spark_helper/#com.spark_helper.DateHelper): Wrapper around [joda-time](http://www.joda.org/joda-time/apidocs/) for dates manipulations.
+* [FieldChecker](http://xavierguihot.github.io/spark_helper/#com.spark_helper.FieldChecker): Validation for stringified fields
+* [Monitor](http://xavierguihot.github.io/spark_helper/#com.spark_helper.monitoring.Monitor): Spark custom monitoring/logger and kpi validator
+
+The goal is to remove the maximum of highly used and highly duplicated low-level
+code from the spark job code and replace it with methods fully tested whose
+names are self-explanatory and readable.
+
+
+## Using spark_helper:
+
+### HdfsHelper:
+
+The full list of methods is available at [HdfsHelper](http://xavierguihot.github.io/spark_helper/#com.spark_helper.HdfsHelper).
+
+Contains basic file-related methods mostly based on hdfs apache Hadoop
+FileSystem API [org.apache.hadoop.fs.FileSystem](https://hadoop.apache.org/docs/r2.6.1/api/org/apache/hadoop/fs/FileSystem.html).
+
+A few exemples:
+
+ import com.spark_helper.HdfsHelper
+
+ // A bunch of methods wrapping the FileSystem API, such as:
+ HdfsHelper.fileExists("my/hdfs/file/path.txt")
+ assert(HdfsHelper.listFileNamesInFolder("my/folder/path") == List("file_name_1.txt", "file_name_2.csv"))
+ assert(HdfsHelper.getFileModificationDate("my/hdfs/file/path.txt") == "20170306")
+ assert(HdfsHelper.getNbrOfDaysSinceFileWasLastModified("my/hdfs/file/path.txt") == 3)
+
+ // Some Xml helpers for hadoop as well:
+ HdfsHelper.isHdfsXmlCompliantWithXsd("my/hdfs/file/path.xml", getClass.getResource("/some_xml.xsd"))
+
+### SparkHelper:
+
+The full list of methods is available at [SparkHelper](http://xavierguihot.github.io/spark_helper/#com.spark_helper.SparkHelper).
+
+Contains basic file/RRD-related methods based on the Spark APIs.
+
+A few exemples:
+
+ import com.spark_helper.SparkHelper
+
+ // Same as SparkContext.saveAsTextFile, but the result is a single file:
+ SparkHelper.saveAsSingleTextFile(myOutputRDD, "/my/output/file/path.txt")
+ // Same as SparkContext.textFile, but instead of reading one record per line,
+ // it reads records spread over several lines:
+ SparkHelper.textFileWithDelimiter("/my/input/folder/path", sparkContext, "---\n")
+
+### DateHelper:
+
+The full list of methods is available at [DateHelper](http://xavierguihot.github.io/spark_helper/#com.spark_helper.DateHelper).
+
+Wrapper around [joda-time](http://www.joda.org/joda-time/apidocs/) for dates manipulations.
+
+A few exemples:
+
+ import com.spark_helper.DateHelper
+
+ assert(DateHelper.daysBetween("20161230", "20170101") == List("20161230", "20161231", "20170101"))
+ assert(DateHelper.today() == "20170310") // If today's "20170310"
+ assert(DateHelper.yesterday() == "20170309") // If today's "20170310"
+ assert(DateHelper.reformatDate("20170327", "yyyyMMdd", "yyMMdd") == "170327")
+ assert(DateHelper.now("HH:mm") == "10:24")
+
+### FieldChecker
+
+The full list of methods is available at [FieldChecker](http://xavierguihot.github.io/spark_helper/#com.spark_helper.FieldChecker).
+
+Validation (before cast) for stringified fields:
+
+A few exemples:
+
+ import com.spark_helper.FieldChecker
+
+ assert(FieldChecker.isInteger("15"))
+ assert(!FieldChecker.isInteger("1.5"))
+ assert(FieldChecker.isInteger("-1"))
+ assert(FieldChecker.isStrictlyPositiveInteger("123"))
+ assert(!FieldChecker.isYyyyMMddDate("20170333"))
+ assert(FieldChecker.isCurrencyCode("USD"))
+
+### Monitor:
+
+The full list of methods is available at [Monitor](http://xavierguihot.github.io/spark_helper/#com.spark_helper.monitoring.Monitor).
+
+It's a simple logger/report that you update during your job. It contains a
+report (a simple string) that you can update and a success boolean which can
+be updated to give a success status on your job. At the end of your job you'll
+have the possibility to store the report in hdfs.
+
+Have a look at the [scaladoc](http://xavierguihot.github.io/spark_helper/#com.spark_helper.monitoring.Monitor)
+for a cool exemple.
+
+
+## Including spark_helper to your dependencies:
+
+
+With sbt, just add this one line to your build.sbt:
+
+ libraryDependencies += "spark_helper" % "spark_helper" % "1.0.0" from "https://github.com/xavierguihot/spark_helper/releases/download/v1.0.0/spark_helper-1.0.0.jar"
+
+
+## Building the project:
+
+
+With sbt:
+
+ sbt assembly
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..162f032
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,28 @@
+name := "spark_helper"
+
+version := "1.0.0"
+
+scalaVersion := "2.10.4"
+
+scalacOptions ++= Seq("-unchecked", "-deprecation", "-feature", "-Xfatal-warnings")
+
+assemblyJarName in assembly := name.value + "-" + version.value + ".jar"
+
+assemblyOutputPath in assembly := file("./" + name.value + "-" + version.value + ".jar")
+
+libraryDependencies += "org.scalatest" %% "scalatest" % "1.9.1" % "test"
+
+libraryDependencies += "joda-time" % "joda-time" % "2.9.4"
+
+libraryDependencies += "org.joda" % "joda-convert" % "1.2"
+
+libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.1" % "provided"
+
+libraryDependencies += "org.apache.commons" % "commons-lang3" % "3.5"
+
+parallelExecution in Test := false
+
+assemblyMergeStrategy in assembly := {
+ case PathList("META-INF", xs @ _*) => MergeStrategy.discard
+ case x => MergeStrategy.first
+}
diff --git a/project/plugins.sbt b/project/plugins.sbt
new file mode 100644
index 0000000..b0ecb6c
--- /dev/null
+++ b/project/plugins.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.2")
\ No newline at end of file
diff --git a/src/main/scala/com/spark_helper/DateHelper.scala b/src/main/scala/com/spark_helper/DateHelper.scala
new file mode 100644
index 0000000..c877c20
--- /dev/null
+++ b/src/main/scala/com/spark_helper/DateHelper.scala
@@ -0,0 +1,344 @@
+package com.spark_helper
+
+import org.joda.time.DateTime
+import org.joda.time.DateTimeZone
+import org.joda.time.Days
+import org.joda.time.format.DateTimeFormat
+import org.joda.time.format.DateTimeFormatter
+
+/** A facility which deals with usual date needs (wrapper around [joda-time](http://www.joda.org/joda-time/apidocs/)).
+ *
+ * The goal is to remove the maximum of highly used low-level code from your
+ * spark job and replace it with methods fully tested whose name is
+ * self-explanatory/readable.
+ *
+ * A few exemples:
+ *
+ * {{{
+ * assert(DateHelper.daysBetween("20161230", "20170101") == List("20161230", "20161231", "20170101"))
+ * assert(DateHelper.today() == "20170310") // If today's "20170310"
+ * assert(DateHelper.yesterday() == "20170309") // If today's "20170310"
+ * assert(DateHelper.reformatDate("20170327", "yyyyMMdd", "yyMMdd") == "170327")
+ * assert(DateHelper.now("HH:mm") == "10:24")
+ * }}}
+ *
+ * Source DateHelper
+ *
+ * @author Xavier Guihot
+ * @since 2017-02
+ */
+object DateHelper extends Serializable {
+
+ /** Finds the list of dates between the two given dates.
+ *
+ * {{{
+ * assert(DateHelper.daysBetween("20161230", "20170101") == List("20161230", "20161231", "20170101"))
+ * }}}
+ *
+ * @param firstDate the first date (in the given format)
+ * @param lastDate the last date (in the given format)
+ * @param format (default = "yyyyMMdd") the format to use for firstDate and
+ * lastDate and for the returned list of dates.
+ * @return the list of dates between firstDate and lastDate in the given
+ * format.
+ */
+ def daysBetween(
+ firstDate: String, lastDate: String, format: String = "yyyyMMdd"
+ ): List[String] = {
+
+ val formatter = DateTimeFormat.forPattern(format)
+
+ jodaDaysBetween(
+ formatter.parseDateTime(firstDate),
+ formatter.parseDateTime(lastDate)
+ ).map(
+ formatter.print
+ )
+ }
+
+ /** Finds the list of dates between the two given dates.
+ *
+ * Sometimes we might want to do additional filtering/operations on the
+ * returned list of dates and thus prefer getting a list of Joda DateTime
+ * objects instead of String dates.
+ *
+ * @param jodaFirstDate the joda DateTime first date
+ * @param jodaLastDate the joda DateTime last date
+ * @return the list of joda DateTime between jodaFirstDate and jodaLastDate
+ */
+ def jodaDaysBetween(
+ jodaFirstDate: DateTime, jodaLastDate: DateTime
+ ): List[DateTime] = {
+
+ val nbrOfDaysWithinRange = Days.daysBetween(
+ jodaFirstDate, jodaLastDate
+ ).getDays()
+
+ (0 to nbrOfDaysWithinRange).toList.map(jodaFirstDate.plusDays)
+ }
+
+ /** Returns which date it was x days before today under the requested format.
+ *
+ * If we're "20170125" and we request for 3 days before, we'll return
+ * "20170122".
+ *
+ * {{{
+ * // If today's "20170310":
+ * assert(DateHelper.nDaysBefore(3) == "20170307")
+ * assert(DateHelper.nDaysBefore(5, "yyMMdd") == "170305")
+ * }}}
+ *
+ * @param nbrOfDaysBefore the nbr of days before today
+ * @param format (default = "yyyyMMdd") the format for the returned date
+ * @return today's date minus the nbrOfDaysBefore under the requested
+ * format.
+ */
+ def nDaysBefore(nbrOfDaysBefore: Int, format: String = "yyyyMMdd"): String = {
+ DateTimeFormat.forPattern(format).print(
+ new DateTime().minusDays(nbrOfDaysBefore)
+ )
+ }
+
+ /** Returns which date it was x days before the given date.
+ *
+ * If the given date is "20170125" and we request the date it was 3 days
+ * before, we'll return "20170122".
+ *
+ * {{{
+ * assert(DateHelper.nDaysBeforeDate(3, "20170310") == "20170307")
+ * assert(DateHelper.nDaysBeforeDate(5, "170310", "yyMMdd") == "170305")
+ * }}}
+ *
+ * @param nbrOfDaysBefore the nbr of days before the given date
+ * @param date the date under the provided format for which we want the
+ * date for nbrOfDaysBefore days before.
+ * @param format (default = "yyyyMMdd") the format for the provided and
+ * returned dates.
+ * @return the date it was nbrOfDaysBefore before date under the
+ * requested format.
+ */
+ def nDaysBeforeDate(
+ nbrOfDaysBefore: Int, date: String, format: String = "yyyyMMdd"
+ ): String = {
+ val currentDate = DateTimeFormat.forPattern(format).parseDateTime(date)
+ DateTimeFormat.forPattern(format).print(currentDate.minusDays(nbrOfDaysBefore))
+ }
+
+ /** Returns which date it will be x days after the given date.
+ *
+ * If the given date is "20170122" and we request the date it will be 3
+ * days after, we'll return "20170125".
+ *
+ * {{{
+ * assert(DateHelper.nDaysBeforeDate(3, "20170307") == "20170310")
+ * assert(DateHelper.nDaysBeforeDate(5, "170305", "yyMMdd") == "170310")
+ * }}}
+ *
+ * @param nbrOfDaysAfter the nbr of days after the given date
+ * @param date the date under the provided format for which we want the
+ * date for nbrOfDaysAfter days after.
+ * @param format (default = "yyyyMMdd") the format for the provided and
+ * returned dates.
+ * @return the date it was nbrOfDaysAfter after date under the
+ * requested format.
+ */
+ def nDaysAfterDate(
+ nbrOfDaysAfter: Int, date: String, format: String = "yyyyMMdd"
+ ): String = {
+ val currentDate = DateTimeFormat.forPattern(format).parseDateTime(date)
+ DateTimeFormat.forPattern(format).print(currentDate.plusDays(nbrOfDaysAfter))
+ }
+
+ /** Returns today's date/time under the requested format.
+ *
+ * {{{
+ * // If today's "20170310":
+ * assert(DateHelper.now() == "20170310_1047")
+ * assert(DateHelper.now("yyyyMMdd") == "20170310")
+ * }}}
+ *
+ * @param format (default = "yyyyMMdd_HHmm") the format for the current date
+ * @return today's date under the requested format
+ */
+ def now(format: String = "yyyyMMdd_HHmm"): String = nDaysBefore(0, format)
+
+ /** Returns today's date/time under the requested format.
+ *
+ * {{{
+ * // If today's "20170310":
+ * assert(DateHelper.today() == "20170310")
+ * assert(DateHelper.today("yyMMdd") == "170310")
+ * }}}
+ *
+ * @param format (default = "yyyyMMdd") the format for the current date
+ * @return today's date under the requested format
+ */
+ def today(format: String = "yyyyMMdd"): String = nDaysBefore(0, format)
+
+ /** Returns yesterday's date/time under the requested format.
+ *
+ * {{{
+ * // If today's "20170310":
+ * assert(DateHelper.yesterday() == "20170309")
+ * assert(DateHelper.yesterday("yyMMdd") == "170309")
+ * }}}
+ *
+ * @param format (default = "yyyyMMdd") the format in which to output the
+ * date of yesterday
+ * @return yesterday's date under the requested format
+ */
+ def yesterday(format: String = "yyyyMMdd"): String = nDaysBefore(1, format)
+
+ /** Returns which date it was 2 days before today under the requested format.
+ *
+ * {{{
+ * // If today's "20170310":
+ * assert(DateHelper.twoDaysAgo() == "20170308")
+ * assert(DateHelper.twoDaysAgo("yyMMdd") == "170308")
+ * }}}
+ *
+ * @param format (default = "yyyyMMdd") the format in which to output the
+ * date of two days ago
+ * @return the date of two days ago under the requested format
+ */
+ def twoDaysAgo(format: String = "yyyyMMdd"): String = nDaysBefore(2, format)
+
+ /** Reformats a date from one format to another.
+ *
+ * {{{
+ * assert(DateHelper.reformatDate("20170327", "yyyyMMdd", "yyMMdd") == "170327")
+ * }}}
+ *
+ * @param date the date to reformat
+ * @param inputFormat the format in which the date to reformat is provided
+ * @param outputFormat the format in which to format the provided date
+ * @return the date under the new format
+ */
+ def reformatDate(
+ date: String, inputFormat: String, outputFormat: String
+ ): String = {
+ DateTimeFormat.forPattern(outputFormat).print(
+ DateTimeFormat.forPattern(inputFormat).parseDateTime(date)
+ )
+ }
+
+ /** Returns the current local timestamp.
+ *
+ * {{{ assert(DateHelper.currentTimestamp() == "1493105229736") }}}
+ *
+ * @return the current timestamps (nbr of millis since 1970-01-01) in the
+ * local computer's zone.
+ */
+ def currentTimestamp(): String = new DateTime().getMillis().toString
+
+ /** Returns the current UTC timestamp.
+ *
+ * {{{ assert(DateHelper.currentUtcTimestamp() == "1493105229736") }}}
+ *
+ * @return the current UTC timestamps (nbr of millis since 1970-01-01).
+ */
+ def currentUtcTimestamp(): String = {
+ new DateTime().withZone(DateTimeZone.UTC).getMillis().toString
+ }
+
+ /** Returns for a date the date one day latter.
+ *
+ * {{{
+ * // If the given date is "20170310":
+ * assert(DateHelper.nextDay("20170310") == "20170311")
+ * assert(DateHelper.nextDay("170310", "yyMMdd") == "170311")
+ * }}}
+ *
+ * @param date the date for which to find the date of the day after
+ * @param format (default = "yyyyMMdd") the format of the provided and the
+ * returned dates.
+ * @return the date of the day after the given date
+ */
+ def nextDay(date: String, format: String = "yyyyMMdd"): String = {
+ val currentDate = DateTimeFormat.forPattern(format).parseDateTime(date)
+ DateTimeFormat.forPattern(format).print(currentDate.plusDays(1))
+ }
+
+ /** Returns for a date the date one day before.
+ *
+ * {{{
+ * // If the given date is "20170310":
+ * assert(DateHelper.previousDay("20170310") == "20170309")
+ * assert(DateHelper.previousDay("170310", "yyMMdd") == "170309")
+ * }}}
+ *
+ * @param date the date for which to find the date of the day before
+ * @param format (default = "yyyyMMdd") the format of the provided and the
+ * returned dates.
+ * @return the date of the day before the given date
+ */
+ def previousDay(date: String, format: String = "yyyyMMdd"): String = {
+ val currentDate = DateTimeFormat.forPattern(format).parseDateTime(date)
+ DateTimeFormat.forPattern(format).print(currentDate.minusDays(1))
+ }
+
+ /** Returns the nbr of days between today and the given date.
+ *
+ * {{{
+ * // If today is "20170327":
+ * assert(DateHelper.nbrOfDaysSince("20170310") == 17)
+ * assert(DateHelper.nbrOfDaysSince("170310", "yyMMdd") == 17)
+ * }}}
+ *
+ * @param date the date for which to find the nbr of days of diff with today
+ * @param format (default = "yyyyMMdd") the format of the provided date
+ * @return the nbr of days between today and the given date
+ */
+ def nbrOfDaysSince(date: String, format: String = "yyyyMMdd"): Int = {
+ Days.daysBetween(
+ DateTimeFormat.forPattern(format).parseDateTime(date),
+ new DateTime()
+ ).getDays()
+ }
+
+ /** Returns the nbr of days between the two given dates.
+ *
+ * {{{
+ * assert(DateHelper.nbrOfDaysBetween("20170327", "20170327") == 0)
+ * assert(DateHelper.nbrOfDaysBetween("20170327", "20170401") == 5)
+ * }}}
+ *
+ * This expects the first date to be before the last date.
+ *
+ * @param firstDate the first date of the range for which to egt the nbr of
+ * days.
+ * @param lastDate the last date of the range for which to egt the nbr of
+ * days.
+ * @param format (default = "yyyyMMdd") the format of the provided dates
+ * @return the nbr of days between the two given dates
+ */
+ def nbrOfDaysBetween(
+ firstDate: String, lastDate: String, format: String = "yyyyMMdd"
+ ): Int = {
+ Days.daysBetween(
+ DateTimeFormat.forPattern(format).parseDateTime(firstDate),
+ DateTimeFormat.forPattern(format).parseDateTime(lastDate)
+ ).getDays()
+ }
+
+ /** Returns the date associated to the given UTC timestamp.
+ *
+ * {{{
+ * assert(DateHelper.getDateFromTimestamp(1496074819L) == "20170529")
+ * assert(DateHelper.getDateFromTimestamp(1496074819L, "yyMMdd") == "170529")
+ * }}}
+ *
+ * @param timestamp the UTC timestamps (nbr of millis since 1970-01-01) for
+ * which to get the associated date.
+ * @param format (default = "yyyyMMdd") the format of the provided dates
+ * @return the associated date under the requested format
+ */
+ def getDateFromTimestamp(
+ timestamp: Long, format: String = "yyyyMMdd"
+ ): String = {
+ DateTimeFormat.forPattern(format).print(
+ new DateTime(timestamp * 1000L, DateTimeZone.UTC)
+ )
+ }
+}
diff --git a/src/main/scala/com/spark_helper/FieldChecker.scala b/src/main/scala/com/spark_helper/FieldChecker.scala
new file mode 100644
index 0000000..2731b7c
--- /dev/null
+++ b/src/main/scala/com/spark_helper/FieldChecker.scala
@@ -0,0 +1,374 @@
+package com.spark_helper
+
+import Math.round
+
+import org.joda.time.DateTime
+import org.joda.time.format.DateTimeFormat
+import org.joda.time.format.DateTimeFormatter
+import org.joda.time.IllegalFieldValueException
+
+/** A facility which validates a value for a specific type of field.
+ *
+ * For instance this allows to validate that a stringified integer is an
+ * integer or that a date is under yyyyMMdd format or again that an airport is
+ * a 3 upper case string:
+ *
+ * A few exemples:
+ *
+ * {{{
+ * assert(FieldChecker.isInteger("15"))
+ * assert(!FieldChecker.isInteger("1.5"))
+ * assert(FieldChecker.isInteger("-1"))
+ * assert(FieldChecker.isStrictlyPositiveInteger("123"))
+ * assert(!FieldChecker.isYyyyMMddDate("20170333"))
+ * assert(FieldChecker.isCurrencyCode("USD"))
+ * }}}
+ *
+ * Source FieldChecker
+ *
+ * @author Xavier Guihot
+ * @since 2017-02
+ */
+object FieldChecker extends Serializable {
+
+ /** Validates a string is an integer.
+ *
+ * {{{
+ * assert(!FieldChecker.isInteger(""))
+ * assert(!FieldChecker.isInteger("sdc"))
+ * assert(FieldChecker.isInteger("0"))
+ * assert(FieldChecker.isInteger("15"))
+ * assert(FieldChecker.isInteger("-1"))
+ * assert(!FieldChecker.isInteger("-1.5"))
+ * assert(!FieldChecker.isInteger("1.5"))
+ * assert(FieldChecker.isInteger("0452"))
+ * }}}
+ *
+ * @param stringValue the stringified integer
+ * @return if the stringified integer is an integer
+ */
+ def isInteger(stringValue: String): Boolean = {
+ try {
+ round(stringValue.toDouble).toFloat == stringValue.toFloat
+ } catch {
+ case nfe: NumberFormatException => false
+ }
+ }
+
+ /** Validates a string is an positive integer.
+ *
+ * {{{
+ * assert(!FieldChecker.isPositiveInteger(""))
+ * assert(!FieldChecker.isPositiveInteger("sdc"))
+ * assert(FieldChecker.isPositiveInteger("0"))
+ * assert(FieldChecker.isPositiveInteger("15"))
+ * assert(!FieldChecker.isPositiveInteger("-1"))
+ * assert(!FieldChecker.isPositiveInteger("-1.5"))
+ * assert(!FieldChecker.isPositiveInteger("1.5"))
+ * assert(FieldChecker.isPositiveInteger("0452"))
+ * }}}
+ *
+ * @param stringValue the stringified positive integer
+ * @return if the stringified positive integer is a positive integer
+ */
+ def isPositiveInteger(stringValue: String): Boolean = {
+ isInteger(stringValue) && round(stringValue.toDouble).toInt >= 0
+ }
+
+ /** Validates a string is a strictly positive integer.
+ *
+ * {{{
+ * assert(!FieldChecker.isStrictlyPositiveInteger(""))
+ * assert(!FieldChecker.isStrictlyPositiveInteger("sdc"))
+ * assert(!FieldChecker.isStrictlyPositiveInteger("0"))
+ * assert(FieldChecker.isStrictlyPositiveInteger("1"))
+ * assert(FieldChecker.isStrictlyPositiveInteger("15"))
+ * assert(!FieldChecker.isStrictlyPositiveInteger("-1"))
+ * assert(!FieldChecker.isStrictlyPositiveInteger("-1.5"))
+ * assert(!FieldChecker.isStrictlyPositiveInteger("1.5"))
+ * assert(FieldChecker.isStrictlyPositiveInteger("0452"))
+ * }}}
+ *
+ * @param stringValue the stringified strictly positive integer
+ * @return if the stringified strictly positive integer is a strictly
+ * positive integer.
+ */
+ def isStrictlyPositiveInteger(stringValue: String): Boolean = {
+ isInteger(stringValue) && round(stringValue.toDouble).toInt > 0
+ }
+
+ /** Validates a string is a float.
+ *
+ * {{{
+ * assert(!FieldChecker.isFloat(""))
+ * assert(!FieldChecker.isFloat("sdc"))
+ * assert(FieldChecker.isFloat("0"))
+ * assert(FieldChecker.isFloat("15"))
+ * assert(FieldChecker.isFloat("-1"))
+ * assert(FieldChecker.isFloat("-1.5"))
+ * assert(FieldChecker.isFloat("1.5"))
+ * }}}
+ *
+ * @param stringValue the stringified float
+ * @return if the stringified float is an float
+ */
+ def isFloat(stringValue: String): Boolean = {
+ try {
+ stringValue.toFloat
+ true
+ } catch {
+ case nfe: NumberFormatException => false
+ }
+ }
+
+ /** Validates a string is a positive float.
+ *
+ * {{{
+ * assert(!FieldChecker.isPositiveFloat(""))
+ * assert(!FieldChecker.isPositiveFloat("sdc"))
+ * assert(FieldChecker.isPositiveFloat("0"))
+ * assert(FieldChecker.isPositiveFloat("15"))
+ * assert(!FieldChecker.isPositiveFloat("-1"))
+ * assert(!FieldChecker.isPositiveFloat("-1.5"))
+ * assert(FieldChecker.isPositiveFloat("1.5"))
+ * }}}
+ *
+ * @param stringValue the stringified positive float
+ * @return if the stringified positive float is a positive float
+ */
+ def isPositiveFloat(stringValue: String): Boolean = {
+ isFloat(stringValue) && stringValue.toFloat >= 0
+ }
+
+ /** Validates a string is a strictly positive float.
+ *
+ * {{{
+ * assert(!FieldChecker.isStrictlyPositiveFloat(""))
+ * assert(!FieldChecker.isStrictlyPositiveFloat("sdc"))
+ * assert(!FieldChecker.isStrictlyPositiveFloat("0"))
+ * assert(FieldChecker.isStrictlyPositiveFloat("15"))
+ * assert(!FieldChecker.isStrictlyPositiveFloat("-1"))
+ * assert(!FieldChecker.isStrictlyPositiveFloat("-1.5"))
+ * assert(FieldChecker.isStrictlyPositiveFloat("1.5"))
+ * }}}
+ *
+ * @param stringValue the stringified strictly positive float
+ * @return if the stringified strictly positive float is a strictly
+ * positive float.
+ */
+ def isStrictlyPositiveFloat(stringValue: String): Boolean = {
+ isFloat(stringValue) && stringValue.toFloat > 0
+ }
+
+ /** Validates a string is a yyyyMMdd date.
+ *
+ * {{{
+ * assert(FieldChecker.isYyyyMMddDate("20170302"))
+ * assert(!FieldChecker.isYyyyMMddDate("20170333"))
+ * assert(FieldChecker.isYyyyMMddDate("20170228"))
+ * assert(!FieldChecker.isYyyyMMddDate("20170229"))
+ * assert(!FieldChecker.isYyyyMMddDate("170228"))
+ * assert(!FieldChecker.isYyyyMMddDate(""))
+ * assert(!FieldChecker.isYyyyMMddDate("a"))
+ * assert(!FieldChecker.isYyyyMMddDate("24JAN17"))
+ * }}}
+ *
+ * @param stringValue the stringified yyyyMMdd date
+ * @return if the stringified yyyyMMdd date is a yyyyMMdd date
+ */
+ def isYyyyMMddDate(stringValue: String): Boolean = {
+ isDateCompliantWithFormat(stringValue, "yyyyMMdd")
+ }
+
+ /** Validates a string is a yyMMdd date.
+ *
+ * {{{
+ * assert(FieldChecker.isYyMMddDate("170302"))
+ * assert(!FieldChecker.isYyMMddDate("170333"))
+ * assert(FieldChecker.isYyMMddDate("170228"))
+ * assert(!FieldChecker.isYyMMddDate("170229"))
+ * assert(!FieldChecker.isYyMMddDate("20170228"))
+ * assert(!FieldChecker.isYyMMddDate(""))
+ * assert(!FieldChecker.isYyMMddDate("a"))
+ * assert(!FieldChecker.isYyMMddDate("24JAN17"))
+ * }}}
+ *
+ * @param stringValue the stringified yyMMdd date
+ * @return if the stringified yyMMdd date is a yyMMdd date
+ */
+ def isYyMMddDate(stringValue: String): Boolean = {
+ isDateCompliantWithFormat(stringValue, "yyMMdd")
+ }
+
+ /** Validates a string is a HHmm time.
+ *
+ * {{{
+ * assert(FieldChecker.isHHmmTime("1224"))
+ * assert(FieldChecker.isHHmmTime("0023"))
+ * assert(!FieldChecker.isHHmmTime("2405"))
+ * assert(!FieldChecker.isHHmmTime("12:24"))
+ * assert(!FieldChecker.isHHmmTime("23"))
+ * assert(!FieldChecker.isHHmmTime(""))
+ * assert(!FieldChecker.isHHmmTime("a"))
+ * assert(!FieldChecker.isHHmmTime("24JAN17"))
+ * }}}
+ *
+ * @param stringValue the stringified HHmm time
+ * @return if the stringified HHmm time is a HHmm time
+ */
+ def isHHmmTime(stringValue: String): Boolean = {
+ isDateCompliantWithFormat(stringValue, "HHmm")
+ }
+
+ /** Validates a string date is under the provided format.
+ *
+ * {{{
+ * assert(FieldChecker.isDateCompliantWithFormat("20170302", "yyyyMMdd"))
+ * assert(!FieldChecker.isDateCompliantWithFormat("20170333", "yyyyMMdd"))
+ * assert(FieldChecker.isDateCompliantWithFormat("20170228", "yyyyMMdd"))
+ * assert(!FieldChecker.isDateCompliantWithFormat("20170229", "yyyyMMdd"))
+ * assert(!FieldChecker.isDateCompliantWithFormat("170228", "yyyyMMdd"))
+ * assert(!FieldChecker.isDateCompliantWithFormat("", "yyyyMMdd"))
+ * assert(!FieldChecker.isDateCompliantWithFormat("a", "yyyyMMdd"))
+ * assert(!FieldChecker.isDateCompliantWithFormat("24JAN17", "yyyyMMdd"))
+ * }}}
+ *
+ * @param stringValue the stringified date
+ * @return if the provided date is under the provided format
+ */
+ def isDateCompliantWithFormat(stringValue: String, format: String): Boolean = {
+ try {
+ DateTimeFormat.forPattern(format).parseDateTime(stringValue)
+ true
+ } catch {
+ case ife: IllegalFieldValueException => false
+ case iae: IllegalArgumentException => false
+ }
+ }
+
+ /** Validates a string is a 3-upper-chars (city/airport code).
+ *
+ * {{{
+ * assert(FieldChecker.isLocationCode("ORY"))
+ * assert(FieldChecker.isLocationCode("NYC "))
+ * assert(!FieldChecker.isLocationCode(""))
+ * assert(!FieldChecker.isLocationCode("ORd"))
+ * assert(!FieldChecker.isLocationCode("FR"))
+ * assert(!FieldChecker.isLocationCode("ORYD"))
+ * }}}
+ *
+ * @param stringValue the airport or city code
+ * @return if the airport or city code looks like a location code
+ */
+ def isLocationCode(stringValue: String): Boolean = {
+ val value = stringValue.trim
+ value.toUpperCase() == value && value.length == 3
+ }
+
+ /** Validates a string is a 3-upper-chars airport code.
+ *
+ * {{{
+ * assert(FieldChecker.isAirportCode("ORY"))
+ * assert(FieldChecker.isAirportCode("NYC "))
+ * assert(!FieldChecker.isAirportCode(""))
+ * assert(!FieldChecker.isAirportCode("ORd"))
+ * assert(!FieldChecker.isAirportCode("FR"))
+ * assert(!FieldChecker.isAirportCode("ORYD"))
+ * }}}
+ *
+ * @param stringValue the airport code
+ * @return if the airport code looks like an airport code
+ */
+ def isAirportCode(stringValue: String): Boolean = isLocationCode(stringValue)
+
+ /** Validates a string is a 3-upper-chars city code.
+ *
+ * {{{
+ * assert(FieldChecker.isAirportCode("PAR"))
+ * assert(FieldChecker.isAirportCode("NCE "))
+ * assert(!FieldChecker.isAirportCode(""))
+ * assert(!FieldChecker.isAirportCode("ORd"))
+ * assert(!FieldChecker.isAirportCode("FR"))
+ * assert(!FieldChecker.isAirportCode("ORYD"))
+ * }}}
+ *
+ * @param stringValue the city code
+ * @return if the city code looks like a city code
+ */
+ def isCityCode(stringValue: String): Boolean = isLocationCode(stringValue)
+
+ /** Validates a string is a 3-upper-chars currency code.
+ *
+ * {{{
+ * assert(FieldChecker.isCurrencyCode("EUR"))
+ * assert(FieldChecker.isCurrencyCode("USD "))
+ * assert(!FieldChecker.isCurrencyCode("EUr"))
+ * assert(!FieldChecker.isCurrencyCode(""))
+ * assert(!FieldChecker.isCurrencyCode("EU"))
+ * assert(!FieldChecker.isCurrencyCode("EURD"))
+ * }}}
+ *
+ * @param stringValue the currency code
+ * @return if the currency code looks like a currency code
+ */
+ def isCurrencyCode(stringValue: String): Boolean = {
+ val value = stringValue.trim
+ value.toUpperCase() == value && value.length == 3
+ }
+
+ /** Validates a string is a 2-upper-chars country code.
+ *
+ * {{{
+ * assert(FieldChecker.isCountryCode("FR"))
+ * assert(FieldChecker.isCountryCode("US "))
+ * assert(!FieldChecker.isCountryCode(""))
+ * assert(!FieldChecker.isCountryCode("Us"))
+ * assert(!FieldChecker.isCountryCode("USD"))
+ * }}}
+ *
+ * @param stringValue the country code
+ * @return if the country code looks like a country code
+ */
+ def isCountryCode(stringValue: String): Boolean = {
+ val value = stringValue.trim
+ value.toUpperCase() == value && value.length == 2
+ }
+
+ /** Validates a string is a 2-upper-chars airline code.
+ *
+ * {{{
+ * assert(FieldChecker.isAirlineCode("AF"))
+ * assert(FieldChecker.isAirlineCode("BA"))
+ * assert(FieldChecker.isAirlineCode("AA "))
+ * assert(!FieldChecker.isAirlineCode(""))
+ * assert(!FieldChecker.isAirlineCode("Af"))
+ * assert(!FieldChecker.isAirlineCode("AFS"))
+ * }}}
+ *
+ * @param stringValue the airline code
+ * @return if the airline code looks like an airline code
+ */
+ def isAirlineCode(stringValue: String): Boolean = {
+ val value = stringValue.trim
+ value.toUpperCase() == value && value.length == 2
+ }
+
+ /** Validates a string is a 1-upper-chars cabin/rbd code.
+ *
+ * {{{
+ * assert(FieldChecker.isClassCode("Y"))
+ * assert(FieldChecker.isClassCode("S "))
+ * assert(!FieldChecker.isClassCode(""))
+ * assert(!FieldChecker.isClassCode("s"))
+ * assert(!FieldChecker.isClassCode("SS"))
+ * }}}
+ *
+ * @param stringValue the cabin/rbd code
+ * @return if the cabin/rbd code looks like a cabin/rbd code
+ */
+ def isClassCode(stringValue: String): Boolean = {
+ val value = stringValue.trim
+ value.toUpperCase() == value && value.length == 1
+ }
+}
diff --git a/src/main/scala/com/spark_helper/HdfsHelper.scala b/src/main/scala/com/spark_helper/HdfsHelper.scala
new file mode 100644
index 0000000..7facf65
--- /dev/null
+++ b/src/main/scala/com/spark_helper/HdfsHelper.scala
@@ -0,0 +1,607 @@
+package com.spark_helper
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.FileUtil
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.IOUtils
+
+import org.joda.time.DateTime
+import org.joda.time.Days
+import org.joda.time.format.DateTimeFormat
+
+import scala.xml.Elem
+import javax.xml.transform.Source
+import javax.xml.transform.stream.StreamSource
+import javax.xml.validation._
+import scala.xml.XML
+import javax.xml.XMLConstants
+
+import org.xml.sax.SAXException
+
+import java.net.URL
+
+import java.io.File
+import java.io.InputStreamReader
+import java.io.IOException
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+
+import scala.collection.JavaConversions._
+
+/** A facility to deal with file manipulations (wrapper around hdfs apache Hadoop FileSystem API [org.apache.hadoop.fs.FileSystem](https://hadoop.apache.org/docs/r2.6.1/api/org/apache/hadoop/fs/FileSystem.html)).
+ *
+ * The goal is to remove the maximum of highly used low-level code from your
+ * spark job and replace it with methods fully tested whose name is
+ * self-explanatory/readable.
+ *
+ * A few exemples:
+ *
+ * {{{
+ * HdfsHelper.fileExists("my/hdfs/file/path.txt")
+ * assert(HdfsHelper.listFileNamesInFolder("my/folder/path") == List("file_name_1.txt", "file_name_2.csv"))
+ * assert(HdfsHelper.getFileModificationDate("my/hdfs/file/path.txt") == "20170306")
+ * assert(HdfsHelper.getNbrOfDaysSinceFileWasLastModified("my/hdfs/file/path.txt") == 3)
+ * HdfsHelper.isHdfsXmlCompliantWithXsd("my/hdfs/file/path.xml", getClass.getResource("/some_xml.xsd"))
+ * }}}
+ *
+ * Source HdfsHelper
+ *
+ * @author Xavier Guihot
+ * @since 2017-02
+ */
+object HdfsHelper extends Serializable {
+
+ /** Deletes a file on HDFS.
+ *
+ * Doesn't throw an exception if the file to delete doesn't exist.
+ *
+ * @param hdfsPath the path of the file to delete
+ */
+ def deleteFile(hdfsPath: String) = {
+
+ val fileSystem = FileSystem.get(new Configuration())
+
+ val fileToDelete = new Path(hdfsPath)
+
+ if (fileSystem.exists(fileToDelete)) {
+
+ if(!fileSystem.isFile(fileToDelete))
+ throw new IllegalArgumentException(
+ "To delete a folder, prefer using the deleteFolder() method."
+ )
+
+ else
+ fileSystem.delete(fileToDelete, true)
+ }
+ }
+
+ /** Deletes a folder on HDFS.
+ *
+ * Doesn't throw an exception if the folder to delete doesn't exist.
+ *
+ * @param hdfsPath the path of the folder to delete
+ */
+ def deleteFolder(hdfsPath: String) = {
+
+ val fileSystem = FileSystem.get(new Configuration())
+
+ val folderToDelete = new Path(hdfsPath)
+
+ if (fileSystem.exists(folderToDelete)) {
+
+ if (fileSystem.isFile(folderToDelete))
+ throw new IllegalArgumentException(
+ "To delete a file, prefer using the deleteFile() method."
+ )
+
+ else
+ fileSystem.delete(folderToDelete, true)
+ }
+ }
+
+ /** Creates a folder on HDFS.
+ *
+ * Doesn't throw an exception if the folder to create already exists.
+ *
+ * @param hdfsPath the path of the folder to create
+ */
+ def createFolder(hdfsPath: String) = {
+ FileSystem.get(new Configuration()).mkdirs(new Path(hdfsPath))
+ }
+
+ /** Checks if the file exists.
+ *
+ * @param hdfsPath the path of the file for which we check if it exists
+ * @return if the file exists
+ */
+ def fileExists(hdfsPath: String): Boolean = {
+
+ val fileSystem = FileSystem.get(new Configuration())
+
+ val fileToCheck = new Path(hdfsPath)
+
+ if (fileSystem.exists(fileToCheck) && !fileSystem.isFile(fileToCheck))
+ throw new IllegalArgumentException(
+ "To check if a folder exists, prefer using the folderExists() method."
+ )
+
+ fileSystem.exists(fileToCheck)
+ }
+
+ /** Checks if the folder exists.
+ *
+ * @param hdfsPath the path of the folder for which we check if it exists
+ * @return if the folder exists
+ */
+ def folderExists(hdfsPath: String): Boolean = {
+
+ val fileSystem = FileSystem.get(new Configuration())
+
+ val folderToCheck = new Path(hdfsPath)
+
+ if (fileSystem.exists(folderToCheck) && fileSystem.isFile(folderToCheck))
+ throw new IllegalArgumentException(
+ "To check if a file exists, prefer using the fileExists() method."
+ )
+
+ fileSystem.exists(folderToCheck)
+ }
+
+ /** Moves/renames a file.
+ *
+ * Throws an IOException if the value of the parameter overwrite is false
+ * and a file already exists at the target path where to move the file.
+ *
+ * This method deals with performing the "mkdir -p" if the target path has
+ * intermediate folders not yet created.
+ *
+ * @param oldPath the path of the file to rename
+ * @param newPath the new path of the file to rename
+ * @param overwrite (default = false) if true, enable the overwrite of the
+ * destination.
+ * @throws classOf[IOException]
+ */
+ def moveFile(oldPath: String, newPath: String, overwrite: Boolean = false) = {
+
+ val fileSystem = FileSystem.get(new Configuration())
+
+ val fileToRename = new Path(oldPath)
+ val renamedFile = new Path(newPath)
+
+ if (fileSystem.exists(fileToRename) && !fileSystem.isFile(fileToRename))
+ throw new IllegalArgumentException(
+ "To move a folder, prefer using the moveFolder() method."
+ )
+
+ if (overwrite)
+ fileSystem.delete(renamedFile, true)
+
+ else if (fileSystem.exists(renamedFile))
+ throw new IOException(
+ "A file already exists at target location " + newPath
+ )
+
+ // Before moving the file to its final destination, we check if the
+ // folder where to put the file exists, and if not we create it:
+ val targetContainerFolder = newPath.split("/").init.mkString("/")
+ createFolder(targetContainerFolder)
+
+ fileSystem.rename(fileToRename, renamedFile)
+ }
+
+ /** Moves/renames a folder.
+ *
+ * Throws an IOException if the value of the parameter overwrite is false
+ * and a folder already exists at the target path where to move the folder.
+ *
+ * This method deals with performing the "mkdir -p" if the target path has
+ * intermediate folders not yet created.
+ *
+ * @param oldPath the path of the folder to rename
+ * @param newPath the new path of the folder to rename
+ * @param overwrite (default = false) if true, enable the overwrite of the
+ * destination.
+ * @throws classOf[IOException]
+ */
+ def moveFolder(oldPath: String, newPath: String, overwrite: Boolean = false) = {
+
+ val fileSystem = FileSystem.get(new Configuration())
+
+ val folderToRename = new Path(oldPath)
+ val renamedFolder = new Path(newPath)
+
+ if (fileSystem.exists(folderToRename) && fileSystem.isFile(folderToRename))
+ throw new IllegalArgumentException(
+ "To move a file, prefer using the moveFile() method."
+ )
+
+ if (overwrite)
+ fileSystem.delete(renamedFolder, true)
+
+ else if (fileSystem.exists(renamedFolder))
+ throw new IOException(
+ "A folder already exists at target location " + newPath
+ )
+
+ // Before moving the folder to its final destination, we check if the
+ // folder where to put the folder exists, and if not we create it:
+ val targetContainerFolder = newPath.split("/").init.mkString("/")
+ createFolder(targetContainerFolder)
+
+ fileSystem.rename(folderToRename, new Path(newPath))
+ }
+
+ /** Saves text in a file when content is too small to really require an RDD.
+ *
+ * Please only consider this way of storing data when the data set is small
+ * enough.
+ *
+ * @param content the string to write in the file (you can provide a
+ * string with \n in order to write several lines).
+ * @param filePath the path of the file in which to write the content
+ */
+ def writeToHdfsFile(content: String, filePath: String) = {
+ val outputFile = FileSystem.get(new Configuration()).create(new Path(filePath))
+ outputFile.write(content.getBytes("UTF-8"))
+ outputFile.close()
+ }
+
+ /** Lists file names in the specified hdfs folder.
+ *
+ * {{{
+ * assert(HdfsHelper.listFileNamesInFolder("my/folder/path") == List("file_name_1.txt", "file_name_2.csv"))
+ * }}}
+ *
+ * @param hdfsPath the path of the folder for which to list file names
+ * @param recursive (default = false) if true, list files in subfolders as
+ * well.
+ * @param onlyName (default = true) if false, list paths instead of only
+ * name of files.
+ * @return the list of file names in the specified folder
+ */
+ def listFileNamesInFolder(
+ hdfsPath: String, recursive: Boolean = false, onlyName: Boolean = true
+ ): List[String] = {
+
+ FileSystem.get(new Configuration()).listStatus(
+ new Path(hdfsPath)
+ ).flatMap(
+ status => {
+ // If it's a file:
+ if (status.isFile) {
+ if (onlyName)
+ List(status.getPath.getName)
+ else
+ List(hdfsPath + "/" + status.getPath.getName)
+ }
+ // If it's a dir and we're in a recursive option:
+ else if (recursive)
+ listFileNamesInFolder(
+ hdfsPath + "/" + status.getPath.getName, true, onlyName
+ )
+ // If it's a dir and we're not in a recursive option:
+ else
+ List()
+ }
+ ).toList.sorted
+ }
+
+ /** Lists folder names in the specified hdfs folder.
+ *
+ * {{{
+ * assert(HdfsHelper.listFolderNamesInFolder("my/folder/path") == List("folder_1", "folder_2"))
+ * }}}
+ *
+ * @param hdfsPath the path of the folder for which to list folder names
+ * @return the list of folder names in the specified folder
+ */
+ def listFolderNamesInFolder(hdfsPath: String): List[String] = {
+
+ FileSystem.get(
+ new Configuration()
+ ).listStatus(
+ new Path(hdfsPath)
+ ).filter(
+ !_.isFile
+ ).map(
+ _.getPath.getName
+ ).toList.sorted
+ }
+
+ /** Returns the joda DateTime of the last modification of the given file.
+ *
+ * @param hdfsPath the path of the file for which to get the last
+ * modification date.
+ * @return the joda DateTime of the last modification of the given file
+ */
+ def getFileModificationDateTime(hdfsPath: String): DateTime = {
+ new DateTime(
+ FileSystem.get(
+ new Configuration()
+ ).getFileStatus(
+ new Path(hdfsPath)
+ ).getModificationTime()
+ )
+ }
+
+ /** Returns the stringified date of the last modification of the given file.
+ *
+ * {{{
+ * assert(HdfsHelper.getFileModificationDate("my/hdfs/file/path.txt") == "20170306")
+ * }}}
+ *
+ * @param hdfsPath the path of the file for which to get the last
+ * modification date.
+ * @param format (default = "yyyyMMdd") the format under which to get the
+ * modification date.
+ * @return the stringified date of the last modification of the given file,
+ * under the provided format.
+ */
+ def getFileModificationDate(hdfsPath: String, format: String = "yyyyMMdd"): String = {
+ DateTimeFormat.forPattern(format).print(
+ getFileModificationDateTime(hdfsPath)
+ )
+ }
+
+ /** Returns the joda DateTime of the last modification of the given folder.
+ *
+ * @param hdfsPath the path of the folder for which to get the last
+ * modification date.
+ * @return the joda DateTime of the last modification of the given folder
+ */
+ def getFolderModificationDateTime(hdfsPath: String): DateTime = {
+ getFileModificationDateTime(hdfsPath)
+ }
+
+ /** Returns the stringified date of the last modification of the given folder.
+ *
+ * {{{
+ * assert(HdfsHelper.getFolderModificationDate("my/hdfs/filder") == "20170306")
+ * }}}
+ *
+ * @param hdfsPath the path of the folder for which to get the last
+ * modification date.
+ * @param format (default = "yyyyMMdd") the format under which to get the
+ * modification date.
+ * @return the stringified date of the last modification of the given
+ * folder, under the provided format.
+ */
+ def getFolderModificationDate(
+ hdfsPath: String, format: String = "yyyyMMdd"
+ ): String = {
+ getFileModificationDate(hdfsPath, format)
+ }
+
+ /** Returns the nbr of days since the given file has been last modified.
+ *
+ * {{{
+ * assert(HdfsHelper.getNbrOfDaysSinceFileWasLastModified("my/hdfs/file/path.txt") == 3)
+ * }}}
+ *
+ * @param hdfsPath the path of the file for which we want the nbr of days
+ * since the last modification.
+ * @return the nbr of days since the given file has been last modified
+ */
+ def getNbrOfDaysSinceFileWasLastModified(hdfsPath: String): Int = {
+ Days.daysBetween(
+ getFileModificationDateTime(hdfsPath), new DateTime()
+ ).getDays()
+ }
+
+ /** Appends a header and a footer to a file.
+ *
+ * Usefull when creating an xml file with spark and you need to add top
+ * level tags.
+ *
+ * If the workingFolderPath parameter is provided, then the processing is
+ * done in a working/tmp folder and then only, the final file is moved
+ * to its final real location. This way, in case of cluster instability,
+ * i.e. in case the Spark job is interupted, this avoids having a temporary
+ * or corrupted file in output.
+ *
+ * @param filePath the path of the file for which to add the header and the
+ * footer.
+ * @param header the header to add
+ * @param footer the footer to add
+ * @param workingFolderPath the path where file manipulations will happen
+ */
+ def appendHeaderAndFooter(
+ filePath: String, header: String, footer: String,
+ workingFolderPath: String = ""
+ ) = {
+ appendHeaderAndFooterInternal(
+ filePath, Some(header), Some(footer), workingFolderPath
+ )
+ }
+
+ /** Appends a header to a file.
+ *
+ * Usefull when creating a csv file with spark and you need to add a header
+ * describing the different fields.
+ *
+ * If the workingFolderPath parameter is provided, then the processing is
+ * done in a working/tmp folder and then only, the final file is moved
+ * to its final real location. This way, in case of cluster instability,
+ * i.e. in case the Spark job is interupted, this avoids having a temporary
+ * or corrupted file in output.
+ *
+ * @param filePath the path of the file for which to add the header
+ * @param header the header to add
+ * @param workingFolderPath the path where file manipulations will happen
+ */
+ def appendHeader(
+ filePath: String, header: String, workingFolderPath: String = ""
+ ) = {
+ appendHeaderAndFooterInternal(
+ filePath, Some(header), None, workingFolderPath
+ )
+ }
+
+ /** Appends a footer to a file.
+ *
+ * If the workingFolderPath parameter is provided, then the processing is
+ * done in a working/tmp folder and then only, the final file is moved
+ * to its final real location. This way, in case of cluster instability,
+ * i.e. in case the Spark job is interupted, this avoids having a temporary
+ * or corrupted file in output.
+ *
+ * @param filePath the path of the file for which to add the footer
+ * @param footer the footer to add
+ * @param workingFolderPath the path where file manipulations will happen
+ */
+ def appendFooter(
+ filePath: String, footer: String, workingFolderPath: String = ""
+ ) = {
+ appendHeaderAndFooterInternal(
+ filePath, None, Some(footer), workingFolderPath
+ )
+ }
+
+ /** Validates an XML file on hdfs in regard to the given XSD.
+ *
+ * @param hdfsXmlPath the path of the file on hdfs for which to validate
+ * the compliance with the given xsd.
+ * @param xsdFile the xsd file. The easiest is to put your xsd file within
+ * your resources folder (src/main/resources) and then get it as an URL
+ * with getClass.getResource("/my_file.xsd").
+ * @return if the xml is compliant with the xsd
+ */
+ def isHdfsXmlCompliantWithXsd(hdfsXmlPath: String, xsdFile: URL): Boolean = {
+ try {
+ validateHdfsXmlWithXsd(hdfsXmlPath, xsdFile)
+ true
+ } catch {
+ case saxe: SAXException => false
+ }
+ }
+
+ /** Validates an XML file on hdfs in regard to the given XSD.
+ *
+ * Returns nothing and don't catch the error if the xml is not valid. This
+ * way you can retrieve the error and analyse it.
+ *
+ * @param hdfsXmlPath the path of the file on hdfs for which to validate
+ * the compliance with the given xsd.
+ * @param xsdFile the xsd file. The easiest is to put your xsd file within
+ * your resources folder (src/main/resources) and then get it as an URL
+ * with getClass.getResource("/my_file.xsd").
+ */
+ def validateHdfsXmlWithXsd(hdfsXmlPath: String, xsdFile: URL) = {
+
+ val fileSystem = FileSystem.get(new Configuration())
+
+ val xmlFile = new StreamSource(fileSystem.open(new Path(hdfsXmlPath)))
+
+ val schemaFactory = SchemaFactory.newInstance(
+ XMLConstants.W3C_XML_SCHEMA_NS_URI
+ )
+
+ val validator = schemaFactory.newSchema(xsdFile).newValidator()
+
+ validator.validate(xmlFile)
+ }
+
+ /** Loads a typesafe config from Hdfs.
+ *
+ * Typesafe is a config format which looks like this:
+ * {{{
+ * config {
+ * airlines = [
+ * {
+ * code = QF
+ * window_size_in_hour = 6
+ * kpis {
+ * search_count_threshold = 25000
+ * popularity_count_threshold = 400
+ * ratio_key_vs_nested_key_threshold = 20
+ * }
+ * }
+ * {
+ * code = AF
+ * window_size_in_hour = 6
+ * kpis {
+ * search_count_threshold = 100000
+ * popularity_count_threshold = 800
+ * ratio_key_vs_nested_key_threshold = 20
+ * }
+ * }
+ * ]
+ * }
+ * }}}
+ *
+ * @param hdfsConfigPath the absolute path of the typesafe config file on
+ * hdfs we want to load as a typesafe Config object.
+ * @return the com.typesafe.config.Config object which contains usable
+ * data.
+ */
+ def loadTypesafeConfigFromHdfs(hdfsConfigPath: String): Config = {
+
+ val reader = new InputStreamReader(
+ FileSystem.get(new Configuration()).open(new Path(hdfsConfigPath))
+ )
+
+ try { ConfigFactory.parseReader(reader) } finally { reader.close() }
+ }
+
+ /** Loads an Xml file from Hdfs as a scala.xml.Elem object.
+ *
+ * For xml files too big to fit in memory, consider instead using the spark
+ * API.
+ *
+ * @param hdfsXmlPath the path of the xml file on hdfs
+ * @return the scala.xml.Elem object
+ */
+ def loadXmlFileFromHdfs(hdfsXmlPath: String): Elem = {
+
+ val reader = new InputStreamReader(
+ FileSystem.get(new Configuration()).open(new Path(hdfsXmlPath))
+ )
+
+ try { XML.load(reader) } finally { reader.close() }
+ }
+
+ /** Internal implementation of the addition to a file of header and footer.
+ *
+ * @param filePath the path of the file for which to add the header and the
+ * footer.
+ * @param header the header to add
+ * @param footer the footer to add
+ * @param workingFolderPath the path where file manipulations will happen
+ */
+ private def appendHeaderAndFooterInternal(
+ filePath: String, header: Option[String], footer: Option[String],
+ workingFolderPath: String
+ ) = {
+
+ val fileSystem = FileSystem.get(new Configuration())
+
+ val tmpOutputPath = workingFolderPath match {
+ case "" => filePath + ".tmp"
+ case _ => workingFolderPath + "/xml.tmp"
+ }
+ deleteFile(tmpOutputPath)
+
+ val inputFile = fileSystem.open(new Path(filePath))
+ val tmpOutputFile = fileSystem.create(new Path(tmpOutputPath))
+
+ if (header != None)
+ tmpOutputFile.write((header.get + "\n").getBytes("UTF-8"))
+
+ try {
+ IOUtils.copyBytes(inputFile, tmpOutputFile, new Configuration(), false)
+ } finally {
+ inputFile.close()
+ }
+
+ if (footer != None)
+ tmpOutputFile.write(footer.get.getBytes("UTF-8"))
+
+ deleteFile(filePath)
+ moveFile(tmpOutputPath, filePath)
+
+ tmpOutputFile.close()
+ }
+}
diff --git a/src/main/scala/com/spark_helper/SparkHelper.scala b/src/main/scala/com/spark_helper/SparkHelper.scala
new file mode 100644
index 0000000..09bbee1
--- /dev/null
+++ b/src/main/scala/com/spark_helper/SparkHelper.scala
@@ -0,0 +1,526 @@
+package com.spark_helper
+
+import org.apache.spark.HashPartitioner
+import org.apache.spark.rdd.RDD
+import org.apache.spark.SparkContext
+
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.FileSystem
+import org.apache.hadoop.fs.FileUtil
+import org.apache.hadoop.fs.Path
+import org.apache.hadoop.io.compress.CompressionCodec
+import org.apache.hadoop.io.LongWritable
+import org.apache.hadoop.io.NullWritable
+import org.apache.hadoop.io.Text
+import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
+
+import scala.util.Random
+
+/** A facility to deal with RDD/file manipulations based on the Spark API.
+ *
+ * The goal is to remove the maximum of highly used low-level code from your
+ * spark job and replace it with methods fully tested whose name is
+ * self-explanatory/readable.
+ *
+ * A few exemples:
+ *
+ * {{{
+ * // Same as SparkContext.saveAsTextFile, but the result is a single file:
+ * SparkHelper.saveAsSingleTextFile(myOutputRDD, "/my/output/file/path.txt")
+ * // Same as SparkContext.textFile, but instead of reading one record per line,
+ * // it reads records spread over several lines:
+ * SparkHelper.textFileWithDelimiter("/my/input/folder/path", sparkContext, "---\n")
+ * }}}
+ *
+ * Source SparkHelper
+ *
+ * @author Xavier Guihot
+ * @since 2017-02
+ */
+object SparkHelper extends Serializable {
+
+ /** Saves an RDD in exactly one file.
+ *
+ * Allows one to save an RDD in one file, while keeping the processing
+ * parallelized.
+ *
+ * {{{ SparkHelper.saveAsSingleTextFile(myRddToStore, "/my/file/path.txt") }}}
+ *
+ * @param outputRDD the RDD of strings to store in one file
+ * @param outputFile the path of the produced file
+ */
+ def saveAsSingleTextFile(outputRDD: RDD[String], outputFile: String) = {
+ saveAsSingleTextFileInternal(outputRDD, outputFile, None)
+ }
+
+ /** Saves an RDD in exactly one file.
+ *
+ * Allows one to save an RDD in one file, while keeping the processing
+ * parallelized.
+ *
+ * {{{
+ * SparkHelper.saveAsSingleTextFile(myRddToStore, "/my/file/path.txt", classOf[BZip2Codec])
+ * }}}
+ *
+ * @param outputRDD the RDD of strings to store in one file
+ * @param outputFile the path of the produced file
+ * @param compressionCodec the type of compression to use (for instance
+ * classOf[BZip2Codec] or classOf[GzipCodec]))
+ */
+ def saveAsSingleTextFile(
+ outputRDD: RDD[String], outputFile: String,
+ compressionCodec: Class[_ <: CompressionCodec]
+ ) = {
+ saveAsSingleTextFileInternal(outputRDD, outputFile, Some(compressionCodec))
+ }
+
+ /** Saves an RDD in exactly one file.
+ *
+ * Allows one to save an RDD in one file, while keeping the processing
+ * parallelized.
+ *
+ * This variant of saveAsSingleTextFile performs the storage in a temporary
+ * folder instead of directly in the final output folder. This way the
+ * risks of having corrupted files in the real output folder due to cluster
+ * interruptions is minimized.
+ *
+ * {{{
+ * SparkHelper.saveAsSingleTextFile(myRddToStore, "/my/file/path.txt", "/my/working/folder/path")
+ * }}}
+ *
+ * @param outputRDD the RDD of strings to store in one file
+ * @param outputFile the path of the produced file
+ * @param workingFolder the path where file manipulations will temporarily
+ * happen.
+ */
+ def saveAsSingleTextFile(
+ outputRDD: RDD[String], outputFile: String, workingFolder: String
+ ) = {
+ saveAsSingleTextFileWithWorkingFolderInternal(
+ outputRDD, outputFile, workingFolder, None
+ )
+ }
+
+ /** Saves an RDD in exactly one file.
+ *
+ * Allows one to save an RDD in one file, while keeping the processing
+ * parallelized.
+ *
+ * This variant of saveAsSingleTextFile performs the storage in a temporary
+ * folder instead of directly in the final output folder. This way the
+ * risks of having corrupted files in the real output folder due to cluster
+ * interruptions is minimized.
+ *
+ * {{{
+ * SparkHelper.saveAsSingleTextFile(myRddToStore, "/my/file/path.txt", "/my/working/folder/path", classOf[BZip2Codec])
+ * }}}
+ *
+ * @param outputRDD the RDD of strings to store in one file
+ * @param outputFile the path of the produced file
+ * @param workingFolder the path where file manipulations will temporarily
+ * happen.
+ * @param compressionCodec the type of compression to use (for instance
+ * classOf[BZip2Codec] or classOf[GzipCodec]))
+ */
+ def saveAsSingleTextFile(
+ outputRDD: RDD[String], outputFile: String, workingFolder: String,
+ compressionCodec: Class[_ <: CompressionCodec]
+ ) = {
+ saveAsSingleTextFileWithWorkingFolderInternal(
+ outputRDD, outputFile, workingFolder, Some(compressionCodec)
+ )
+ }
+
+ /** Equivalent to sparkContext.textFile(), but for a specific record delimiter.
+ *
+ * By default, sparkContext.textFile() will provide one record per line.
+ * But what if the format you want to read considers that one record (one
+ * entity) is stored in more than one line (yml, xml, ...)?
+ *
+ * For instance in order to read a yml file, which is a format for which a
+ * record (a single entity) is spread other several lines, you can modify
+ * the record delimiter with "---\n" instead of "\n". Same goes when
+ * reading an xml file where a record might be spread over several lines or
+ * worse the whole xml file is one line.
+ *
+ * {{{
+ * // Let's say data we want to use with Spark looks like this (one record
+ * // is a customer, but it's spread over several lines):
+ * \n
+ * \n
+ * 34 thingy street, someplace, sometown\n
+ * \n
+ * \n
+ * 12 thingy street, someplace, sometown\n
+ * \n
+ *
+ * //Then you can use it this way:
+ * val computedRecords = HdfsHelper.textFileWithDelimiter(
+ * "my/path/to/customers.xml", sparkContext, \n
+ * ).collect()
+ * val expectedRecords = Array(
+ * \n,
+ * (
+ * 34 thingy street, someplace, sometown\n +
+ * \n
+ * ),
+ * (
+ *
12 thingy street, someplace, sometown\n +
+ * \n +
+ *
+ * )
+ * )
+ * assert(computedRecords == expectedRecords)
+ * }}}
+ *
+ * @param hdfsPath the path of the file to read (folder or file, '*' works
+ * as well).
+ * @param sparkContext the SparkContext
+ * @param delimiter the specific record delimiter which replaces "\n"
+ * @return the RDD of records
+ */
+ def textFileWithDelimiter(
+ hdfsPath: String, sparkContext: SparkContext, delimiter: String
+ ): RDD[String] = {
+
+ val conf = new Configuration(sparkContext.hadoopConfiguration)
+ conf.set("textinputformat.record.delimiter", delimiter)
+
+ sparkContext.newAPIHadoopFile(
+ hdfsPath, classOf[TextInputFormat],
+ classOf[LongWritable], classOf[Text], conf
+ ).map {
+ case (_, text) => text.toString
+ }
+ }
+
+ /** Saves and repartitions a key/value RDD on files whose name is the key.
+ *
+ * Within the provided outputFolder, will be one file per key in your
+ * keyValueRDD. And within a file for a given key are only values for this
+ * key.
+ *
+ * You need to know the nbr of keys beforehand (in general you use this to
+ * split your dataset in subsets, or to output one file per client, so you
+ * know how many keys you have). So you need to put as keyNbr the exact nbr
+ * of keys you'll have.
+ *
+ * This is not scalable. This shouldn't be considered for any data flow
+ * with normal or big volumes.
+ *
+ * {{{
+ * SparkHelper.saveAsTextFileByKey(myKeyValueRddToStore, "/my/output/folder/path", 12)
+ * }}}
+ *
+ * @param keyValueRDD the key/value RDD
+ * @param outputFolder the foldder where will be storrred key files
+ * @param keyNbr the nbr of expected keys (which is the nbr of outputed
+ * files).
+ */
+ def saveAsTextFileByKey(
+ keyValueRDD: RDD[(String, String)], outputFolder: String, keyNbr: Int
+ ) = {
+
+ HdfsHelper.deleteFolder(outputFolder)
+
+ keyValueRDD.partitionBy(
+ new HashPartitioner(keyNbr)
+ ).saveAsHadoopFile(
+ outputFolder, classOf[String], classOf[String], classOf[KeyBasedOutput]
+ )
+ }
+
+ /** Saves and repartitions a key/value RDD on files whose name is the key.
+ *
+ * Within the provided outputFolder, will be one file per key in your
+ * keyValueRDD. And within a file for a given key are only values for this
+ * key.
+ *
+ * You need to know the nbr of keys beforehand (in general you use this to
+ * split your dataset in subsets, or to output one file per client, so you
+ * know how many keys you have). So you need to put as keyNbr the exact nbr
+ * of keys you'll have.
+ *
+ * This is not scalable. This shouldn't be considered for any data flow
+ * with normal or big volumes.
+ *
+ * {{{
+ * SparkHelper.saveAsTextFileByKey(myKeyValueRddToStore, "/my/output/folder/path", 12, classOf[BZip2Codec])
+ * }}}
+ *
+ * @param keyValueRDD the key/value RDD
+ * @param outputFolder the foldder where will be storrred key files
+ * @param keyNbr the nbr of expected keys (which is the nbr of outputed
+ * files).
+ * @param compressionCodec the type of compression to use (for instance
+ * classOf[BZip2Codec] or classOf[GzipCodec]))
+ */
+ def saveAsTextFileByKey(
+ keyValueRDD: RDD[(String, String)], outputFolder: String, keyNbr: Int,
+ compressionCodec: Class[_ <: CompressionCodec]
+ ) = {
+
+ HdfsHelper.deleteFolder(outputFolder)
+
+ keyValueRDD.partitionBy(
+ new HashPartitioner(keyNbr)
+ ).saveAsHadoopFile(
+ outputFolder, classOf[String], classOf[String],
+ classOf[KeyBasedOutput], compressionCodec
+ )
+ }
+
+ /** Decreases the nbr of partitions of a folder.
+ *
+ * This is often handy when the last step of your job needs to run on
+ * thousands of files, but you want to store your final output on let's say
+ * only 300 files.
+ *
+ * It's like a FileUtil.copyMerge, but the merging produces more than one
+ * file.
+ *
+ * Be aware that this methods deletes the provided input folder.
+ *
+ * {{{
+ * SparkHelper.decreaseCoalescence(
+ * "/folder/path/with/2000/files", "/produced/folder/path/with/only/300/files", 300, sparkContext
+ * )
+ * }}}
+ *
+ * @param highCoalescenceLevelFolder the folder which contains 10000 files
+ * @param lowerCoalescenceLevelFolder the folder which will contain the
+ * same data as highCoalescenceLevelFolder but spread on only 300 files (
+ * where 300 is the finalCoalescenceLevel parameter).
+ * @param finalCoalescenceLevel the nbr of files within the folder at the
+ * end of this method.
+ * @param sparkContext the SparkContext
+ */
+ def decreaseCoalescence(
+ highCoalescenceLevelFolder: String, lowerCoalescenceLevelFolder: String,
+ finalCoalescenceLevel: Int, sparkContext: SparkContext
+ ) = {
+ decreaseCoalescenceInternal(
+ highCoalescenceLevelFolder, lowerCoalescenceLevelFolder,
+ finalCoalescenceLevel, sparkContext, None
+ )
+ }
+
+ /** Decreases the nbr of partitions of a folder.
+ *
+ * This is often handy when the last step of your job needs to run on
+ * thousands of files, but you want to store your final output on let's say
+ * only 300 files.
+ *
+ * It's like a FileUtil.copyMerge, but the merging produces more than one
+ * file.
+ *
+ * Be aware that this methods deletes the provided input folder.
+ *
+ * {{{
+ * SparkHelper.decreaseCoalescence(
+ * "/folder/path/with/2000/files", "/produced/folder/path/with/only/300/files", 300, sparkContext, classOf[BZip2Codec]
+ * )
+ * }}}
+ *
+ * @param highCoalescenceLevelFolder the folder which contains 10000 files
+ * @param lowerCoalescenceLevelFolder the folder which will contain the
+ * same data as highCoalescenceLevelFolder but spread on only 300 files (
+ * where 300 is the finalCoalescenceLevel parameter).
+ * @param finalCoalescenceLevel the nbr of files within the folder at the
+ * end of this method.
+ * @param sparkContext the SparkContext
+ * @param compressionCodec the type of compression to use (for instance
+ * classOf[BZip2Codec] or classOf[GzipCodec]))
+ */
+ def decreaseCoalescence(
+ highCoalescenceLevelFolder: String, lowerCoalescenceLevelFolder: String,
+ finalCoalescenceLevel: Int, sparkContext: SparkContext,
+ compressionCodec: Class[_ <: CompressionCodec]
+ ) = {
+ decreaseCoalescenceInternal(
+ highCoalescenceLevelFolder, lowerCoalescenceLevelFolder,
+ finalCoalescenceLevel, sparkContext, Some(compressionCodec)
+ )
+ }
+
+ /** Saves as text file, but by decreasing the nbr of partitions of the output.
+ *
+ * Same as decreaseCoalescence, but the storage of the RDD in an
+ * intermediate folder is included.
+ *
+ * This still makes the processing parallelized, but the output is
+ * coalesced.
+ *
+ * {{{
+ * SparkHelper.saveAsTextFileAndCoalesce(myRddToStore, "/produced/folder/path/with/only/300/files", 300)
+ * }}}
+ *
+ * @param outputRDD the RDD to store, processed for instance on 10000 tasks
+ * (which would thus be stored as 10000 files).
+ * @param outputFolder the folder where will finally be stored the RDD but
+ * spread on only 300 files (where 300 is the value of the
+ * finalCoalescenceLevel parameter).
+ * @param finalCoalescenceLevel the nbr of files within the folder at the
+ * end of this method.
+ */
+ def saveAsTextFileAndCoalesce(
+ outputRDD: RDD[String], outputFolder: String, finalCoalescenceLevel: Int
+ ) = {
+
+ val sparkContext = outputRDD.context
+
+ // We remove folders where to store data in case they already exist:
+ HdfsHelper.deleteFolder(outputFolder + "_tmp")
+ HdfsHelper.deleteFolder(outputFolder)
+
+ // We first save the rdd with the level of coalescence used during the
+ // processing. This way the processing is done with the right level of
+ // tasks:
+ outputRDD.saveAsTextFile(outputFolder + "_tmp")
+
+ // Then we read back this tmp folder, apply the coalesce and store it
+ // back:
+ decreaseCoalescenceInternal(
+ outputFolder + "_tmp", outputFolder,
+ finalCoalescenceLevel, sparkContext, None
+ )
+ }
+
+ /** Saves as text file, but by decreasing the nbr of partitions of the output.
+ *
+ * Same as decreaseCoalescence, but the storage of the RDD in an
+ * intermediate folder is included.
+ *
+ * This still makes the processing parallelized, but the output is
+ * coalesced.
+ *
+ * {{{
+ * SparkHelper.saveAsTextFileAndCoalesce(myRddToStore, "/produced/folder/path/with/only/300/files", 300, classOf[BZip2Codec])
+ * }}}
+ *
+ * @param outputRDD the RDD to store, processed for instance on 10000 tasks
+ * (which would thus be stored as 10000 files).
+ * @param outputFolder the folder where will finally be stored the RDD but
+ * spread on only 300 files (where 300 is the value of the
+ * finalCoalescenceLevel parameter).
+ * @param finalCoalescenceLevel the nbr of files within the folder at the
+ * end of this method.
+ * @param compressionCodec the type of compression to use (for instance
+ * classOf[BZip2Codec] or classOf[GzipCodec]))
+ */
+ def saveAsTextFileAndCoalesce(
+ outputRDD: RDD[String], outputFolder: String, finalCoalescenceLevel: Int,
+ compressionCodec: Class[_ <: CompressionCodec]
+ ) = {
+
+ val sparkContext = outputRDD.context
+
+ // We remove folders where to store data in case they already exist:
+ HdfsHelper.deleteFolder(outputFolder + "_tmp")
+ HdfsHelper.deleteFolder(outputFolder)
+
+ // We first save the rdd with the level of coalescence used during the
+ // processing. This way the processing is done with the right level of
+ // tasks:
+ outputRDD.saveAsTextFile(outputFolder + "_tmp")
+
+ // Then we read back this tmp folder, apply the coalesce and store it
+ // back:
+ decreaseCoalescenceInternal(
+ outputFolder + "_tmp", outputFolder,
+ finalCoalescenceLevel, sparkContext, Some(compressionCodec)
+ )
+ }
+
+ //////
+ // Internal core:
+ //////
+
+ private def saveAsSingleTextFileWithWorkingFolderInternal(
+ outputRDD: RDD[String], outputFile: String, workingFolder: String,
+ compressionCodec: Option[Class[_ <: CompressionCodec]]
+ ) {
+
+ // We chose a random name for the temporary file:
+ val temporaryName = Random.alphanumeric.take(10).mkString("")
+
+ // We perform the merge into a temporary single text file:
+ saveAsSingleTextFileInternal(
+ outputRDD, workingFolder + "/" + temporaryName, compressionCodec
+ )
+
+ // And then only we put the resulting file in its final real location:
+ HdfsHelper.moveFile(
+ workingFolder + "/" + temporaryName, outputFile,
+ overwrite = true
+ )
+ }
+
+ /** Saves RDD in exactly one file.
+ *
+ * Allows one to save an RDD as one text file, but at the same time to keep
+ * the processing parallelized.
+ *
+ * @param outputRDD the RDD of strings to save as text file
+ * @param outputFile the path where to save the file
+ * @param compression the compression codec to use (can be left to None)
+ */
+ private def saveAsSingleTextFileInternal(
+ outputRDD: RDD[String], outputFile: String,
+ compressionCodec: Option[Class[_ <: CompressionCodec]]
+ ): Unit = {
+
+ val fileSystem = FileSystem.get(new Configuration())
+
+ // Classic saveAsTextFile in a temporary folder:
+ HdfsHelper.deleteFolder(outputFile + ".tmp")
+ if (compressionCodec.isEmpty)
+ outputRDD.saveAsTextFile(outputFile + ".tmp")
+ else
+ outputRDD.saveAsTextFile(outputFile + ".tmp", compressionCodec.get)
+
+ // Merge the folder into a single file:
+ HdfsHelper.deleteFile(outputFile)
+ FileUtil.copyMerge(
+ fileSystem, new Path(outputFile + ".tmp"),
+ fileSystem, new Path(outputFile),
+ true, new Configuration(), null
+ )
+ HdfsHelper.deleteFolder(outputFile + ".tmp")
+ }
+
+ private def decreaseCoalescenceInternal(
+ highCoalescenceLevelFolder: String, lowerCoalescenceLevelFolder: String,
+ finalCoalescenceLevel: Int, sparkContext: SparkContext,
+ compressionCodec: Option[Class[_ <: CompressionCodec]]
+ ): Unit = {
+
+ val intermediateRDD = sparkContext.textFile(
+ highCoalescenceLevelFolder
+ ).coalesce(
+ finalCoalescenceLevel
+ )
+
+ if (compressionCodec.isEmpty)
+ intermediateRDD.saveAsTextFile(lowerCoalescenceLevelFolder)
+ else
+ intermediateRDD.saveAsTextFile(
+ lowerCoalescenceLevelFolder, compressionCodec.get
+ )
+
+ HdfsHelper.deleteFolder(highCoalescenceLevelFolder)
+ }
+}
+
+private class KeyBasedOutput extends MultipleTextOutputFormat[Any, Any] {
+
+ override def generateActualKey(key: Any, value: Any): Any = NullWritable.get()
+
+ override def generateFileNameForKeyValue(
+ key: Any, value: Any, name: String
+ ): String = {
+ key.asInstanceOf[String]
+ }
+}
diff --git a/src/main/scala/com/spark_helper/monitoring/Monitor.scala b/src/main/scala/com/spark_helper/monitoring/Monitor.scala
new file mode 100644
index 0000000..8da0839
--- /dev/null
+++ b/src/main/scala/com/spark_helper/monitoring/Monitor.scala
@@ -0,0 +1,472 @@
+package com.spark_helper.monitoring
+
+import com.spark_helper.DateHelper
+import com.spark_helper.HdfsHelper
+
+import org.apache.spark.SparkContext
+
+import java.util.Calendar
+
+import org.apache.commons.lang3.time.DurationFormatUtils
+
+import java.lang.Throwable
+
+/** A facility used to monitor a Spak job.
+ *
+ * It's a simple logger/report that you update during your job. It contains a
+ * report (a simple string) that you can update and a success boolean which can
+ * be updated to give a success status on your job. At the end of your job
+ * you'll have the possibility to store the report in hdfs.
+ *
+ * Let's go through a simple Spark job example monitored with this Monitor
+ * facility:
+ *
+ * {{{
+ * val sparkContext = new SparkContext(new SparkConf())
+ * val monitor = new Monitor("My Simple Job")
+ *
+ * try {
+ *
+ * // Let's perform a spark pipeline which might goes wrong:
+ * val processedData = sparkContext.textFile("/my/hdfs/input/path").map(do whatever)
+ *
+ * // Let's say you want to get some KPIs on your output before storing it:
+ * val outputIsValid = monitor.updateByKpisValidation(
+ * List(
+ * new Test("Nbr of output records", processedData.count(), "superior to", 10e6f, "nbr"),
+ * new Test("Some pct of invalid output", your_complex_kpi, "inferior to", 3, "pct")
+ * ),
+ * "My pipeline descirption"
+ * )
+ *
+ * if (outputIsValid)
+ * processedData.saveAsTextFile("wherever/folder")
+ *
+ * } catch {
+ * case iie: InvalidInputException => {
+ * monitor.updateReportWithError(iie, "My pipeline descirption", diagnostic = "No input data!")
+ * }
+ * case e: Throwable => {
+ * monitor.updateReportWithError(e, "My pipeline descirption")
+ * }
+ * }
+ *
+ * if (monitor.isSuccess()) {
+ * val doMore = "Let's do more stuff!"
+ * monitor.updateReport("My second pipeline description: success")
+ * }
+ *
+ * // At the end of the different steps of the job, we can store the report in HDFS:
+ * monitor.saveReport("/my/hdfs/functionnal/logs/folder")
+ *
+ * // At the end of your job, if you considered your job isn't successfull, then crash it!:
+ * if (!monitor.isSuccess()) throw new Exception()
+ * }}}
+ *
+ * If we were to read the stored report after this simple pipeline, here are
+ * some possible scenarios:
+ *
+ * First scenario, problem with the input of the job:
+ * {{{
+ * My Simple Job
+ *
+ * [10:23] Begining
+ * [10:23-10:23] My pipeline descirption: failed
+ * Diagnostic: No input data!
+ * org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://my/hdfs/input/path
+ * at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
+ * at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
+ * ...
+ * [10:23] Duration: 00:00:00
+ * }}}
+ *
+ * Another scenario, unexpected problem:
+ * {{{
+ * My Simple Job
+ *
+ * [10:23] Begining
+ * [10:23-10:36] My pipeline descirption: failed
+ * java.lang.NumberFormatException: For input string: "a"
+ * java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
+ * java.lang.Integer.parseInt(Integer.java:492)
+ * ...
+ * [10:36] Duration: 00:13:47
+ * }}}
+ *
+ * Another scenario, successfull spark pipeline and KPIs are valid; all good!:
+ * {{{
+ * My Simple Job
+ *
+ * [10:23] Begining
+ * [10:23-10:41] My pipeline descirption: success
+ * KPI: Nbr of output records
+ * Value: 14669071.0
+ * Must be superior to 10000000.0
+ * Validated: true
+ * KPI: Some pct of invalid output
+ * Value: 0.06%
+ * Must be inferior to 3.0%
+ * Validated: true
+ * [10:41-10:42] My second pipeline description: success
+ * [10:42] Duration: 00:19:23
+ * }}}
+ *
+ * One of the good things of this facility is the catching of spark exceptions
+ * and it's storage within a log file. This makes it a lot easier to quickly
+ * find what went wrong than having to find back and scroll yarn logs.
+ *
+ * It comes in handy in production environments for which locating the yarn
+ * logs of your spark job can be a pain. Or in order when the poduction job
+ * fails to send the report of the error by email. Or simply to keep track of
+ * historical kpis, processing times, ...
+ *
+ * This is not supposed to be updated from within a Spark pipeline
+ * (actions/transformations) but rather from the orchestration of the
+ * pipelines.
+ *
+ * Source Monitor
+ *
+ * @author Xavier Guihot
+ * @since 2017-02
+ *
+ * @constructor Creates a Monitor object.
+ *
+ * Creating the Monitor object like this:
+ * {{{ new Monitor("My Spark Job Title", "someone@box.com", "Whatever pretty descritpion.") }}}
+ * will result in the report to start like this:
+ * {{{
+ * My Spark Job Title
+ *
+ * Point of contact: someone@box.com
+ * Whatever pretty descritpion.
+ * [..:..] Begining
+ * }}}
+ *
+ * @param reportTitle (optional) what's outputed as a first line of the report
+ * @param pointOfContact (optional) the persons in charge of the job
+ * @param additionalInfo (optional) anything you want written at the begining
+ * of your report.
+ */
+class Monitor(
+ reportTitle: String = "", pointOfContact: String = "",
+ additionalInfo: String = ""
+) {
+
+ private var success = true
+ private var report = initiateReport()
+
+ private val begining = Calendar.getInstance().getTimeInMillis()
+
+ private var lastReportUpdate = DateHelper.now("HH:mm")
+
+ /** Returns if at that point all previous stages were successfull.
+ *
+ * @return if your spark job is successfull.
+ */
+ def isSuccess(): Boolean = success
+
+ /** Returns the current state of the monitoring report.
+ *
+ * @return the report.
+ */
+ def getReport(): String = report
+
+ /** Updates the report with some text.
+ *
+ * Using this method like this:
+ * {{{ monitor.updateReport("Some text") }}}
+ * will result in this to be appended to the report:
+ * {{{ "[10:35-10:37] Some text\n" }}}
+ *
+ * @param text the text to append to the report
+ */
+ def updateReport(text: String) = {
+
+ val before = lastReportUpdate
+ val now = DateHelper.now("HH:mm")
+
+ lastReportUpdate = now
+
+ report += "[" + before + "-" + now + "]" + " " + text + "\n"
+ }
+
+ /** Updates the report with some text and a success.
+ *
+ * If the status of the monitoring was success, then it stays success. If
+ * it was failure, then it stays a failure.
+ *
+ * Using this method like this:
+ * {{{ monitor.updateReportWithSuccess("Some text") }}}
+ * will result in this to be appended to the report:
+ * {{{ "[10:35-10:37] Some text: success\n" }}}
+ *
+ * @param taskDescription the text to append to the report
+ * @return true since it's a success
+ */
+ def updateReportWithSuccess(taskDescription: String): Boolean = {
+ updateReport(taskDescription + ": success")
+ true
+ }
+
+ /** Updates the report with some text and a failure.
+ *
+ * This sets the status of the monitoring to false. After that the status
+ * will never be success again, even if you update the report with success
+ * tasks.
+ *
+ * Using this method like this:
+ * {{{ monitor.updateReportWithFailure("Some text") }}}
+ * will result in this to be appended to the report:
+ * {{{ "[10:35-10:37] Some text: failure\n" }}}
+ *
+ * Once the monitoring is a failure, then whatever following successfull
+ * action won't change the failed status of the monitoring.
+ *
+ * @param taskDescription the text to append to the report
+ * @return false since it's a failure
+ */
+ def updateReportWithFailure(taskDescription: String): Boolean = {
+ updateReport(taskDescription + ": failed")
+ success = false
+ false
+ }
+
+ /** Updates the report with the stack trace of an error.
+ *
+ * This sets the status of the monitoring to false. After that the status
+ * will never be success again, even if you update the report with success
+ * tasks.
+ *
+ * Catching an error like this:
+ * {{{
+ * monitor.updateReportWithError(
+ * invalidInputException, "My pipeline descirption", diagnostic = "No input data!"
+ * )
+ * }}}
+ * will result in this to be appended to the report:
+ * {{{
+ * [10:23-10:24] My pipeline descirption: failed
+ * Diagnostic: No input data!
+ * org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://my/hdfs/input/path
+ * at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:285)
+ * at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:228)
+ * ...
+ * }}}
+ *
+ * @param error the trown error
+ * @param taskDescription the description of the step which failed
+ * @param diagnostic (optional) the message we want to add to clarify the
+ * source of the problem. By default if this parameter is not used, then no
+ * diagnostic is append to the report.
+ * @return false since it's a failure
+ */
+ def updateReportWithError(
+ error: Throwable, taskDescription: String, diagnostic: String = ""
+ ): Boolean = {
+
+ // In addition to updating the report with the stack trace and a
+ // possible diagnostic, we set the monitoring as failed:
+ success = false
+
+ if (taskDescription != "")
+ updateReport(taskDescription + ": failed")
+
+ if (diagnostic != "")
+ report += "\tDiagnostic: " + diagnostic + "\n"
+
+ report += (
+ "\t\t" + error.toString() + "\n" +
+ error.getStackTrace.map(line => "\t\t" + line).mkString("\n") + "\n"
+ )
+
+ false
+ }
+
+ /** Updates the report by the validation of a list of kpis/tests.
+ *
+ * By providing a list of [[com.spark_helper.monitoring.Test]] objects to
+ * validate against thresholds, the report is updated with a detailed
+ * result of the validation and the success status of the monitoring is set
+ * to false if at least one KPI isn't valid.
+ *
+ * If the validation of tests is a failure then after that the status will
+ * never be success again, even if you update the report with success tasks.
+ *
+ * Using this method like this:
+ * {{{
+ * monitor.updateByKpisValidation(
+ * List(
+ * new Test("pctOfWhatever", 0.06f, "inferior to", 0.1f, "pct"),
+ * new Test("pctOfSomethingElse", 0.27f, "superior to", 0.3f, "pct"),
+ * new Test("someNbr", 1235f, "equal to", 1235f, "nbr")
+ * ),
+ * "Tests for whatever"
+ * )
+ * }}}
+ * will result in this to be appended to the report:
+ * {{{
+ * [10:35-10:37] Tests for whatever: failed
+ * KPI: pctOfWhatever
+ * Value: 0.06%
+ * Must be inferior to 0.1%
+ * Validated: true
+ * KPI: pctOfSomethingElse
+ * Value: 0.27%
+ * Must be superior to 0.3%
+ * Validated: false
+ * KPI: someNbr
+ * Value: 1235.0
+ * Must be equal to 1235.0
+ * Validated: true
+ * }}}
+ *
+ * @param tests the list of Test objects to validate
+ * @param testSuitName the description of the task being tested
+ * @return if all tests were successful
+ */
+ def updateByKpisValidation(tests: List[Test], testSuitName: String): Boolean = {
+
+ val testsAreValid = !tests.map(_.isSuccess()).contains(false)
+
+ if (!testsAreValid)
+ success = false
+
+ // A title in the report for the kpi validation:
+ if (testSuitName != "") {
+ val validation = if (testsAreValid) "success" else "failed"
+ updateReport(testSuitName + ": " + validation)
+ }
+
+ // The kpi report is added to the report:
+ report += tests.map(_.stringify).mkString("\n") + "\n"
+
+ testsAreValid
+ }
+
+ /** Updates the report by the validation of a single kpi.
+ *
+ * By providing a [[com.spark_helper.monitoring.Test]] object to validate
+ * against a threshold, the report is updated with a detailed result of the
+ * validation and the success status of the monitoring is set to false if
+ * the KPI isn't valid.
+ *
+ * If the validation is a failure then after that the status will never be
+ * success again, even if you update the report with success tasks.
+ *
+ * Using this method like this:
+ * {{{
+ * monitor.updateByKpiValidation(
+ * new Test("pctOfWhatever", 0.06f, "inferior to", 0.1f, "pct"),
+ * "Tests for whatever"
+ * )
+ * }}}
+ * will result in this to be appended to the report:
+ * {{{
+ * [10:35-10:37] Tests for whatever: success
+ * KPI: pctOfWhatever
+ * Value: 0.06%
+ * Must be inferior to 0.1%
+ * Validated: true
+ * }}}
+ *
+ * @param test the Test object to validate
+ * @param testSuitName the description of the task being tested
+ * @return if the test is successful
+ */
+ def updateByKpiValidation(test: Test, testSuitName: String): Boolean = {
+ updateByKpisValidation(List(test), testSuitName)
+ }
+
+ /** Saves the report in a single text file.
+ *
+ * This report will be stored in the folder provided by the parameter
+ * logFolder and its name will be either yyyyMMdd_HHmmss.log.success or
+ * yyyyMMdd_HHmmss.log.failed depending on the monitoring status.
+ *
+ * In addition to storing the report with a timestamp-based name, it is
+ * also stored under the name "current.success" or "current.failed" in the
+ * same folder in order to give it a fixed name for downstream projects to
+ * look for. Obviously if the new status is success, and the previous was
+ * failed, the previous current.failed file is deleted and vis et versa.
+ *
+ * For high frequency jobs, it might be good not to keep all logs
+ * indefinitely. To avoid that, the parameter purgeLogs can be set to true
+ * and by providing the parameter purgeWindow, the nbr of days after which
+ * a log file is purged can be specified.
+ *
+ * @param logFolder the path of the folder in which this report is archived
+ * @param purgeLogs (default = false) if logs are purged when too old
+ * @param purgeWindow (default = 7 if purgeLogs = true) if purgeLogs is set
+ * to true, after how many days a log file is considered outdated and is
+ * purged.
+ */
+ def saveReport(
+ logFolder: String, purgeLogs: Boolean = false, purgeWindow: Int = 7
+ ) = {
+
+ // We add the job duration to the report:
+ val finalReport = report + (
+ DateHelper.now("[HH:mm]") + " Duration: " +
+ DurationFormatUtils.formatDuration(
+ Calendar.getInstance().getTimeInMillis() - begining, "HH:mm:ss"
+ )
+ )
+
+ // The extension of the report depending on the success:
+ val validationFileExtension = if (isSuccess) ".success" else ".failed"
+
+ // And we store the file as a simple text file with a name based on the
+ // timestamp:
+ HdfsHelper.writeToHdfsFile(
+ finalReport,
+ logFolder + "/" + DateHelper.now("yyyyMMdd_HHmmss") + ".log" + validationFileExtension
+ )
+
+ // But we store it as well with a fixed name such as current.success:
+ HdfsHelper.deleteFile(logFolder + "/current.success")
+ HdfsHelper.deleteFile(logFolder + "/current.failed")
+ HdfsHelper.writeToHdfsFile(
+ finalReport,
+ logFolder + "/current" + validationFileExtension
+ )
+
+ if (purgeLogs)
+ purgeOutdatedLogs(logFolder, purgeWindow)
+ }
+
+ private def initiateReport(): String = {
+
+ var initialReport = ""
+
+ if (reportTitle != "")
+ initialReport += "\t\t\t\t\t" + reportTitle + "\n\n"
+ if (pointOfContact != "")
+ initialReport += "Point of contact: " + pointOfContact + "\n"
+ if (additionalInfo != "")
+ initialReport += additionalInfo + "\n"
+
+ initialReport + DateHelper.now("[HH:mm]") + " Begining\n"
+ }
+
+ private def purgeOutdatedLogs(logFolder: String, purgeWindow: Int) = {
+
+ val nDaysAgo = DateHelper.nDaysBefore(purgeWindow, "yyyyMMdd")
+
+ if (HdfsHelper.folderExists(logFolder)) {
+
+ HdfsHelper.listFileNamesInFolder(
+ logFolder
+ ).filter(
+ logName => !logName.startsWith("current")
+ ).filter(
+ logName => { // 20170327_1545.log.success
+ val logDate = logName.substring(0, 8) // 20170327
+ logDate < nDaysAgo
+ }
+ ).foreach(
+ logName => HdfsHelper.deleteFile(logFolder + "/" + logName)
+ )
+ }
+ }
+}
diff --git a/src/main/scala/com/spark_helper/monitoring/Test.scala b/src/main/scala/com/spark_helper/monitoring/Test.scala
new file mode 100644
index 0000000..b853244
--- /dev/null
+++ b/src/main/scala/com/spark_helper/monitoring/Test.scala
@@ -0,0 +1,86 @@
+package com.spark_helper.monitoring
+
+import java.security.InvalidParameterException
+
+import java.lang.Math.abs
+
+/** A class which represents a KPI to validate.
+ *
+ * This is intended to be used as parameter of Monitor.updateByKpiValidation
+ * and Monitor.updateByKpisValidation methods.
+ *
+ * Some exemples of Test objects:
+ * {{{
+ * new Test("pctOfWhatever", 0.06f, "inferior to", 0.1f, "pct")
+ * new Test("pctOfSomethingElse", 0.27f, "superior to", 0.3f, "pct")
+ * new Test("someNbr", 1235f, "equal to", 1235f, "nbr")
+ * }}}
+ *
+ * @author Xavier Guihot
+ * @since 2016-12
+ *
+ * @constructor Creates a Test object.
+ *
+ * Some exemples of Test objects:
+ * {{{
+ * new Test("pctOfWhatever", 0.06f, "inferior to", 0.1f, "pct")
+ * new Test("pctOfSomethingElse", 0.27f, "superior to", 0.3f, "pct")
+ * new Test("someNbr", 1235f, "equal to", 1235f, "nbr")
+ * }}}
+ *
+ * @param description the name/description of the KPI which will appear on the
+ * validation report.
+ * @param kpiValue the value for this KPI
+ * @param thresholdType the type of threshold ("superior to", "inferior to"
+ * or "equal to").
+ * @param appliedThreshold the threshold to apply
+ * @param kpiType the type of KPI ("pct" or "nbr")
+ */
+class Test(
+ description: String, kpiValue: Float, thresholdType: String,
+ appliedThreshold: Float, kpiType: String
+) {
+
+ // Let's check user inputs are correct:
+ {
+ if (!List("superior to", "inferior to", "equal to").contains(thresholdType))
+ throw new InvalidParameterException(
+ "The threshold type can only be \"superior to\", \"inferior to\"" +
+ "or \"equal to\", but you used: \"" + thresholdType + "\"."
+ )
+ if (!List("pct", "nbr").contains(kpiType))
+ throw new InvalidParameterException(
+ "The kpi type can only be \"pct\" or \"nbr\", but you " +
+ "used: \"" + kpiType + "\"."
+ )
+ }
+
+ /** Getter for the success of this test */
+ private[monitoring] def isSuccess(): Boolean = {
+
+ if (thresholdType == "superior to")
+ abs(kpiValue) >= appliedThreshold
+
+ else if (thresholdType == "inferior to")
+ abs(kpiValue) <= appliedThreshold
+
+ else
+ kpiValue == appliedThreshold
+ }
+
+ /** Stringify a pretty report for this test */
+ private[monitoring] def stringify(): String = {
+
+ val suffix = kpiType match {
+ case "pct" => "%"
+ case "nbr" => ""
+ }
+
+ List(
+ "\tKPI: " + description,
+ "\t\tValue: " + kpiValue.toString + suffix,
+ "\t\tMust be " + thresholdType + " " + appliedThreshold + suffix,
+ "\t\tValidated: " + isSuccess().toString
+ ).mkString("\n")
+ }
+}
diff --git a/src/test/resources/some_xml.xsd b/src/test/resources/some_xml.xsd
new file mode 100644
index 0000000..08161cf
--- /dev/null
+++ b/src/test/resources/some_xml.xsd
@@ -0,0 +1,11 @@
+
+
+
+
+
+
+
+
+
+
+
\ No newline at end of file
diff --git a/src/test/scala/com/spark_helper/DateHelperTest.scala b/src/test/scala/com/spark_helper/DateHelperTest.scala
new file mode 100644
index 0000000..7b4efdd
--- /dev/null
+++ b/src/test/scala/com/spark_helper/DateHelperTest.scala
@@ -0,0 +1,66 @@
+package com.spark_helper
+
+import org.scalatest.FunSuite
+
+/** Testing facility for date helpers.
+ *
+ * @author Xavier Guihot
+ * @since 2017-02
+ */
+class DateHelperTest extends FunSuite {
+
+ test("Range of Dates") {
+
+ // 1: With the default formatter:
+ var dates = DateHelper.daysBetween("20161229", "20170103")
+ var expectedDates = List(
+ "20161229", "20161230", "20161231",
+ "20170101", "20170102", "20170103"
+ )
+ assert(dates === expectedDates)
+
+ // 2: With a custom formatter:
+ dates = DateHelper.daysBetween("29Dec16", "03Jan17", "ddMMMyy")
+ expectedDates = List(
+ "29Dec16", "30Dec16", "31Dec16", "01Jan17", "02Jan17", "03Jan17"
+ )
+ assert(dates === expectedDates)
+ }
+
+ test("Reformat Date") {
+ assert(DateHelper.reformatDate("20170327", "yyyyMMdd", "yyMMdd") === "170327")
+ assert(DateHelper.reformatDate("20170327", "yyyyMMdd", "MMddyy") === "032717")
+ }
+
+ test("Next Day") {
+ assert(DateHelper.nextDay("20170310") === "20170311")
+ assert(DateHelper.nextDay("170310", "yyMMdd") === "170311")
+ assert(DateHelper.nextDay("20170310_0000", "yyyyMMdd_HHmm") === "20170311_0000")
+ }
+
+ test("Previous Day") {
+ assert(DateHelper.previousDay("20170310") === "20170309")
+ assert(DateHelper.previousDay("170310", "yyMMdd") === "170309")
+ assert(DateHelper.previousDay("20170310_0000", "yyyyMMdd_HHmm") === "20170309_0000")
+ }
+
+ test("Nbr of Days Between Two Dates") {
+ assert(DateHelper.nbrOfDaysBetween("20170327", "20170327") === 0)
+ assert(DateHelper.nbrOfDaysBetween("20170327", "20170401") === 5)
+ }
+
+ test("Get Date from Timestamp") {
+ assert(DateHelper.getDateFromTimestamp(1496074819L) === "20170529")
+ assert(DateHelper.getDateFromTimestamp(1496074819L, "yyMMdd") === "170529")
+ }
+
+ test("Date it was N Days before Date") {
+ assert(DateHelper.nDaysBeforeDate(3, "20170310") === "20170307")
+ assert(DateHelper.nDaysBeforeDate(5, "170310", "yyMMdd") === "170305")
+ }
+
+ test("Date it will be N Days affter Date") {
+ assert(DateHelper.nDaysAfterDate(3, "20170307") === "20170310")
+ assert(DateHelper.nDaysAfterDate(5, "170305", "yyMMdd") === "170310")
+ }
+}
diff --git a/src/test/scala/com/spark_helper/FieldCheckerTest.scala b/src/test/scala/com/spark_helper/FieldCheckerTest.scala
new file mode 100644
index 0000000..3872d61
--- /dev/null
+++ b/src/test/scala/com/spark_helper/FieldCheckerTest.scala
@@ -0,0 +1,229 @@
+package com.spark_helper
+
+import org.joda.time.format.DateTimeFormat
+
+import org.scalatest.FunSuite
+
+/** Testing facility for field validation helpers.
+ *
+ * @author Xavier Guihot
+ * @since 2017-02
+ */
+class FieldCheckerTest extends FunSuite {
+
+ test("Integer") {
+ assert(!FieldChecker.isInteger(""))
+ assert(!FieldChecker.isInteger("sdc"))
+ assert(FieldChecker.isInteger("0"))
+ assert(FieldChecker.isInteger("15"))
+ assert(FieldChecker.isInteger("-1"))
+ assert(!FieldChecker.isInteger("-1.5"))
+ assert(!FieldChecker.isInteger("1.5"))
+ assert(FieldChecker.isInteger("0452"))
+ }
+
+ test("Positive Integer") {
+ assert(!FieldChecker.isPositiveInteger(""))
+ assert(!FieldChecker.isPositiveInteger("sdc"))
+ assert(FieldChecker.isPositiveInteger("0"))
+ assert(FieldChecker.isPositiveInteger("15"))
+ assert(!FieldChecker.isPositiveInteger("-1"))
+ assert(!FieldChecker.isPositiveInteger("-1.5"))
+ assert(!FieldChecker.isPositiveInteger("1.5"))
+ assert(FieldChecker.isPositiveInteger("0452"))
+ }
+
+ test("Strictly Positive Integer") {
+ assert(!FieldChecker.isStrictlyPositiveInteger(""))
+ assert(!FieldChecker.isStrictlyPositiveInteger("sdc"))
+ assert(!FieldChecker.isStrictlyPositiveInteger("0"))
+ assert(FieldChecker.isStrictlyPositiveInteger("1"))
+ assert(FieldChecker.isStrictlyPositiveInteger("15"))
+ assert(!FieldChecker.isStrictlyPositiveInteger("-1"))
+ assert(!FieldChecker.isStrictlyPositiveInteger("-1.5"))
+ assert(!FieldChecker.isStrictlyPositiveInteger("1.5"))
+ assert(FieldChecker.isStrictlyPositiveInteger("0452"))
+ }
+
+ test("Float") {
+ assert(!FieldChecker.isFloat(""))
+ assert(!FieldChecker.isFloat("sdc"))
+ assert(FieldChecker.isFloat("0"))
+ assert(FieldChecker.isFloat("15"))
+ assert(FieldChecker.isFloat("-1"))
+ assert(FieldChecker.isFloat("-1.5"))
+ assert(FieldChecker.isFloat("1.5"))
+ }
+
+ test("Positive Float") {
+ assert(!FieldChecker.isPositiveFloat(""))
+ assert(!FieldChecker.isPositiveFloat("sdc"))
+ assert(FieldChecker.isPositiveFloat("0"))
+ assert(FieldChecker.isPositiveFloat("15"))
+ assert(!FieldChecker.isPositiveFloat("-1"))
+ assert(!FieldChecker.isPositiveFloat("-1.5"))
+ assert(FieldChecker.isPositiveFloat("1.5"))
+ }
+
+ test("Strictly Positive Float") {
+ assert(!FieldChecker.isStrictlyPositiveFloat(""))
+ assert(!FieldChecker.isStrictlyPositiveFloat("sdc"))
+ assert(!FieldChecker.isStrictlyPositiveFloat("0"))
+ assert(FieldChecker.isStrictlyPositiveFloat("15"))
+ assert(!FieldChecker.isStrictlyPositiveFloat("-1"))
+ assert(!FieldChecker.isStrictlyPositiveFloat("-1.5"))
+ assert(FieldChecker.isStrictlyPositiveFloat("1.5"))
+ }
+
+ test("YyyyMMdd Date") {
+ assert(FieldChecker.isYyyyMMddDate("20170302"))
+ assert(!FieldChecker.isYyyyMMddDate("20170333"))
+ assert(FieldChecker.isYyyyMMddDate("20170228"))
+ assert(!FieldChecker.isYyyyMMddDate("20170229"))
+ assert(!FieldChecker.isYyyyMMddDate("170228"))
+ assert(!FieldChecker.isYyyyMMddDate(""))
+ assert(!FieldChecker.isYyyyMMddDate("a"))
+ assert(!FieldChecker.isYyyyMMddDate("24JAN17"))
+ }
+
+ test("YyMMdd Date") {
+ assert(FieldChecker.isYyMMddDate("170302"))
+ assert(!FieldChecker.isYyMMddDate("170333"))
+ assert(FieldChecker.isYyMMddDate("170228"))
+ assert(!FieldChecker.isYyMMddDate("170229"))
+ assert(!FieldChecker.isYyMMddDate("20170228"))
+ assert(!FieldChecker.isYyMMddDate(""))
+ assert(!FieldChecker.isYyMMddDate("a"))
+ assert(!FieldChecker.isYyMMddDate("24JAN17"))
+ }
+
+ test("HHmm Time") {
+ assert(FieldChecker.isHHmmTime("1224"))
+ assert(FieldChecker.isHHmmTime("0023"))
+ assert(!FieldChecker.isHHmmTime("2405"))
+ assert(!FieldChecker.isHHmmTime("12:24"))
+ assert(!FieldChecker.isHHmmTime("23"))
+ assert(!FieldChecker.isHHmmTime(""))
+ assert(!FieldChecker.isHHmmTime("a"))
+ assert(!FieldChecker.isHHmmTime("24JAN17"))
+ }
+
+ test("Date versus Provided Format") {
+ assert(FieldChecker.isDateCompliantWithFormat("20170302", "yyyyMMdd"))
+ assert(!FieldChecker.isDateCompliantWithFormat("20170333", "yyyyMMdd"))
+ assert(FieldChecker.isDateCompliantWithFormat("20170228", "yyyyMMdd"))
+ assert(!FieldChecker.isDateCompliantWithFormat("20170229", "yyyyMMdd"))
+ assert(!FieldChecker.isDateCompliantWithFormat("170228", "yyyyMMdd"))
+ assert(!FieldChecker.isDateCompliantWithFormat("", "yyyyMMdd"))
+ assert(!FieldChecker.isDateCompliantWithFormat("a", "yyyyMMdd"))
+ assert(!FieldChecker.isDateCompliantWithFormat("24JAN17", "yyyyMMdd"))
+ }
+
+ test("String is Airport or City Code") {
+
+ // Location alias:
+
+ assert(FieldChecker.isLocationCode("ORY"))
+ assert(FieldChecker.isLocationCode("NCE"))
+ assert(FieldChecker.isLocationCode("JFK"))
+ assert(FieldChecker.isLocationCode("NYC"))
+ assert(FieldChecker.isLocationCode("NYC "))
+
+ assert(!FieldChecker.isLocationCode("ORd"))
+ assert(!FieldChecker.isLocationCode("xxx"))
+
+ assert(!FieldChecker.isLocationCode("FR"))
+ assert(!FieldChecker.isLocationCode("fr"))
+ assert(!FieldChecker.isLocationCode("ORYD"))
+
+ assert(!FieldChecker.isLocationCode(""))
+
+ // Airport alias:
+
+ assert(FieldChecker.isAirportCode("ORY"))
+ assert(FieldChecker.isAirportCode("NCE"))
+ assert(FieldChecker.isAirportCode("JFK"))
+ assert(FieldChecker.isAirportCode("NYC"))
+ assert(FieldChecker.isAirportCode("NYC "))
+
+ assert(!FieldChecker.isAirportCode("ORd"))
+ assert(!FieldChecker.isAirportCode("xxx"))
+
+ assert(!FieldChecker.isAirportCode("FR"))
+ assert(!FieldChecker.isAirportCode("fr"))
+ assert(!FieldChecker.isAirportCode("ORYD"))
+
+ assert(!FieldChecker.isAirportCode(""))
+
+ // City alias:
+
+ assert(FieldChecker.isCityCode("ORY"))
+ assert(FieldChecker.isCityCode("NCE"))
+ assert(FieldChecker.isCityCode("JFK"))
+ assert(FieldChecker.isCityCode("NYC"))
+ assert(FieldChecker.isCityCode("NYC "))
+
+ assert(!FieldChecker.isCityCode("ORd"))
+ assert(!FieldChecker.isCityCode("xxx"))
+
+ assert(!FieldChecker.isCityCode("FR"))
+ assert(!FieldChecker.isCityCode("fr"))
+ assert(!FieldChecker.isCityCode("ORYD"))
+
+ assert(!FieldChecker.isCityCode(""))
+ }
+
+ test("String is Currency Code") {
+
+ assert(FieldChecker.isCurrencyCode("EUR"))
+ assert(FieldChecker.isCurrencyCode("USD"))
+ assert(FieldChecker.isCurrencyCode("USD "))
+
+ assert(!FieldChecker.isCurrencyCode("EUr"))
+ assert(!FieldChecker.isCurrencyCode("xxx"))
+
+ assert(!FieldChecker.isCurrencyCode("EU"))
+ assert(!FieldChecker.isCurrencyCode("eu"))
+ assert(!FieldChecker.isCurrencyCode("EURD"))
+
+ assert(!FieldChecker.isCurrencyCode(""))
+ }
+
+ test("String is Country Code") {
+
+ assert(FieldChecker.isCountryCode("FR"))
+ assert(FieldChecker.isCountryCode("US"))
+ assert(FieldChecker.isCountryCode("US "))
+
+ assert(!FieldChecker.isCountryCode("Us"))
+ assert(!FieldChecker.isCountryCode("us"))
+ assert(!FieldChecker.isCountryCode("USD"))
+
+ assert(!FieldChecker.isCountryCode(""))
+ }
+
+ test("String is Airline Code") {
+
+ assert(FieldChecker.isAirlineCode("AF"))
+ assert(FieldChecker.isAirlineCode("BA"))
+ assert(FieldChecker.isAirlineCode("AA "))
+
+ assert(!FieldChecker.isAirlineCode("Af"))
+ assert(!FieldChecker.isAirlineCode("af"))
+ assert(!FieldChecker.isAirlineCode("AFS"))
+
+ assert(!FieldChecker.isAirlineCode(""))
+ }
+
+ test("String is Class Code") {
+
+ assert(FieldChecker.isClassCode("Y"))
+ assert(FieldChecker.isClassCode("H"))
+ assert(FieldChecker.isClassCode("S "))
+
+ assert(!FieldChecker.isClassCode("s"))
+ assert(!FieldChecker.isClassCode("SS"))
+
+ assert(!FieldChecker.isClassCode(""))
+ }
+}
diff --git a/src/test/scala/com/spark_helper/HdfsHelperTest.scala b/src/test/scala/com/spark_helper/HdfsHelperTest.scala
new file mode 100644
index 0000000..04f138d
--- /dev/null
+++ b/src/test/scala/com/spark_helper/HdfsHelperTest.scala
@@ -0,0 +1,478 @@
+package com.spark_helper
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkConf
+
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import java.io.IOException
+
+import org.scalatest.FunSuite
+
+/** Testing facility for the HdfsHelper.
+ *
+ * @author Xavier Guihot
+ * @since 2017-02
+ */
+class HdfsHelperTest extends FunSuite {
+
+ Logger.getLogger("org").setLevel(Level.OFF)
+ Logger.getLogger("akka").setLevel(Level.OFF)
+
+ test("Delete File/Folder") {
+
+ // Let's try to delete a file:
+
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/file_to_delete.txt")
+
+ // 1: Let's try to delete it with the deleteFolder method:
+ var messageThrown = intercept[IllegalArgumentException] {
+ HdfsHelper.deleteFolder("src/test/resources/file_to_delete.txt")
+ }
+ var expectedMessage = "To delete a file, prefer using the deleteFile() method."
+ assert(messageThrown.getMessage === expectedMessage)
+ assert(HdfsHelper.fileExists("src/test/resources/file_to_delete.txt"))
+
+ // 2: Let's delete it with the deleteFile method:
+ HdfsHelper.deleteFile("src/test/resources/file_to_delete.txt")
+ assert(!HdfsHelper.fileExists("src/test/resources/file_to_delete.txt"))
+
+ // Let's try to delete a folder:
+
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/folder_to_delete/file.txt")
+
+ // 3: Let's try to delete it with the deleteFile method:
+ messageThrown = intercept[IllegalArgumentException] {
+ HdfsHelper.deleteFile("src/test/resources/folder_to_delete")
+ }
+ expectedMessage = "To delete a folder, prefer using the deleteFolder() method."
+ assert(messageThrown.getMessage === expectedMessage)
+ assert(HdfsHelper.folderExists("src/test/resources/folder_to_delete"))
+
+ // 4: Let's delete it with the deleteFolder method:
+ HdfsHelper.deleteFolder("src/test/resources/folder_to_delete")
+ assert(!HdfsHelper.folderExists("src/test/resources/folder_to_delete"))
+ }
+
+ test("File/Folder Exists") {
+
+ HdfsHelper.deleteFile("src/test/resources/file_to_check.txt")
+ HdfsHelper.deleteFolder("src/test/resources/folder_to_check")
+
+ // Let's try to check if a file exists:
+
+ assert(!HdfsHelper.fileExists("src/test/resources/file_to_check.txt"))
+
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/file_to_check.txt")
+
+ // 1: Let's try to check it exists with the folderExists method:
+ var messageThrown = intercept[IllegalArgumentException] {
+ HdfsHelper.folderExists("src/test/resources/file_to_check.txt")
+ }
+ var expectedMessage = (
+ "To check if a file exists, prefer using the fileExists() method."
+ )
+ assert(messageThrown.getMessage === expectedMessage)
+
+ // 2: Let's try to check it exists with the fileExists method:
+ assert(HdfsHelper.fileExists("src/test/resources/file_to_check.txt"))
+
+ // Let's try to check if a folder exists:
+
+ assert(!HdfsHelper.folderExists("src/test/resources/folder_to_check"))
+
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/folder_to_check/file.txt")
+
+ // 3: Let's try to check it exists with the fileExists method:
+ messageThrown = intercept[IllegalArgumentException] {
+ HdfsHelper.fileExists("src/test/resources/folder_to_check")
+ }
+ expectedMessage = (
+ "To check if a folder exists, prefer using the folderExists() method."
+ )
+ assert(messageThrown.getMessage === expectedMessage)
+
+ // 2: Let's try to check it exists with the folderExists method:
+ assert(HdfsHelper.folderExists("src/test/resources/folder_to_check"))
+
+ HdfsHelper.deleteFile("src/test/resources/file_to_check.txt")
+ HdfsHelper.deleteFolder("src/test/resources/folder_to_check")
+ }
+
+ test("Save Text in HDFS File with the FileSystem API instead of the Spark API") {
+
+ val sparkContext = new SparkContext(
+ new SparkConf().setAppName("Spark").setMaster("local[2]")
+ )
+
+ HdfsHelper.deleteFile("src/test/resources/folder/small_file.txt")
+
+ val contentToStore = "Hello World\nWhatever"
+
+ HdfsHelper.writeToHdfsFile(
+ contentToStore, "src/test/resources/folder/small_file.txt"
+ )
+
+ assert(HdfsHelper.fileExists("src/test/resources/folder/small_file.txt"))
+
+ val storedContent = sparkContext.textFile(
+ "src/test/resources/folder/small_file.txt"
+ ).collect().sorted.mkString("\n")
+
+ assert(storedContent === contentToStore)
+
+ HdfsHelper.deleteFolder("src/test/resources/folder")
+
+ sparkContext.stop()
+ }
+
+ test("List File Names in Hdfs Folder") {
+
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/folder_1/file_1.txt")
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/folder_1/file_2.csv")
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/folder_1/folder_2/file_3.txt")
+
+ // 1: Not recursive, names only:
+ var fileNames = HdfsHelper.listFileNamesInFolder(
+ "src/test/resources/folder_1"
+ )
+ var expectedFileNames = List("file_1.txt", "file_2.csv")
+ assert(fileNames === expectedFileNames)
+
+ // 2: Not recursive, full paths:
+ fileNames = HdfsHelper.listFileNamesInFolder(
+ "src/test/resources/folder_1", onlyName = false
+ )
+ expectedFileNames = List(
+ "src/test/resources/folder_1/file_1.txt",
+ "src/test/resources/folder_1/file_2.csv"
+ )
+ assert(fileNames === expectedFileNames)
+
+ // 3: Recursive, names only:
+ fileNames = HdfsHelper.listFileNamesInFolder(
+ "src/test/resources/folder_1", recursive = true
+ )
+ expectedFileNames = List("file_1.txt", "file_2.csv", "file_3.txt")
+ assert(fileNames === expectedFileNames)
+
+ // 4: Recursive, full paths:
+ fileNames = HdfsHelper.listFileNamesInFolder(
+ "src/test/resources/folder_1",
+ recursive = true, onlyName = false
+ )
+ expectedFileNames = List(
+ "src/test/resources/folder_1/file_1.txt",
+ "src/test/resources/folder_1/file_2.csv",
+ "src/test/resources/folder_1/folder_2/file_3.txt"
+ )
+ assert(fileNames === expectedFileNames)
+
+ HdfsHelper.deleteFolder("src/test/resources/folder_1")
+ }
+
+ test("List Folder Names in Hdfs Folder") {
+
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/folder_1/file_1.txt")
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/folder_1/folder_2/file_2.txt")
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/folder_1/folder_3/file_3.txt")
+
+ val folderNames = HdfsHelper.listFolderNamesInFolder(
+ "src/test/resources/folder_1"
+ )
+ val expectedFolderNames = List("folder_2", "folder_3")
+
+ assert(folderNames === expectedFolderNames)
+
+ HdfsHelper.deleteFolder("src/test/resources/folder_1")
+ }
+
+ test("Move File") {
+
+ val sparkContext = new SparkContext(
+ new SparkConf().setAppName("Spark").setMaster("local[2]")
+ )
+
+ // Let's remove possible previous stuff:
+ HdfsHelper.deleteFile("src/test/resources/some_file.txt")
+ HdfsHelper.deleteFile("src/test/resources/renamed_file.txt")
+
+ // Let's create the file to rename:
+ HdfsHelper.writeToHdfsFile("whatever", "src/test/resources/some_file.txt")
+
+ // 1: Let's try to move the file on a file which already exists without
+ // the overwrite option:
+
+ assert(HdfsHelper.fileExists("src/test/resources/some_file.txt"))
+ assert(!HdfsHelper.fileExists("src/test/resources/renamed_file.txt"))
+
+ // Let's create the existing file where we want to move our file:
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/renamed_file.txt")
+
+ // Let's rename the file to the path where a file already exists:
+ val ioExceptionThrown = intercept[IOException] {
+ HdfsHelper.moveFile(
+ "src/test/resources/some_file.txt",
+ "src/test/resources/renamed_file.txt"
+ )
+ }
+ var expectedMessage = (
+ "A file already exists at target location " +
+ "src/test/resources/renamed_file.txt"
+ )
+ assert(ioExceptionThrown.getMessage === expectedMessage)
+
+ assert(HdfsHelper.fileExists("src/test/resources/some_file.txt"))
+ assert(HdfsHelper.fileExists("src/test/resources/renamed_file.txt"))
+
+ HdfsHelper.deleteFile("src/test/resources/renamed_file.txt")
+
+ // 2: Let's fail to move the file with the moveFolder() method:
+
+ assert(HdfsHelper.fileExists("src/test/resources/some_file.txt"))
+ assert(!HdfsHelper.fileExists("src/test/resources/renamed_file.txt"))
+
+ // Let's rename the file:
+ val illegalArgExceptionThrown = intercept[IllegalArgumentException] {
+ HdfsHelper.moveFolder(
+ "src/test/resources/some_file.txt",
+ "src/test/resources/renamed_file.txt"
+ )
+ }
+ expectedMessage = "To move a file, prefer using the moveFile() method."
+ assert(illegalArgExceptionThrown.getMessage === expectedMessage)
+
+ assert(HdfsHelper.fileExists("src/test/resources/some_file.txt"))
+ assert(!HdfsHelper.fileExists("src/test/resources/renamed_file.txt"))
+
+ // 3: Let's successfuly move the file with the moveFile() method:
+
+ // Let's rename the file:
+ HdfsHelper.moveFile(
+ "src/test/resources/some_file.txt",
+ "src/test/resources/renamed_file.txt"
+ )
+
+ assert(!HdfsHelper.fileExists("src/test/resources/some_file.txt"))
+ assert(HdfsHelper.fileExists("src/test/resources/renamed_file.txt"))
+
+ val newContent = sparkContext.textFile(
+ "src/test/resources/renamed_file.txt"
+ ).collect
+
+ assert(Array("whatever") === newContent)
+
+ HdfsHelper.deleteFile("src/test/resources/renamed_file.txt")
+
+ sparkContext.stop()
+ }
+
+ test("Move Folder") {
+
+ val sparkContext = new SparkContext(
+ new SparkConf().setAppName("Spark").setMaster("local[2]")
+ )
+
+ // Let's remove possible previous stuff:
+ HdfsHelper.deleteFolder("src/test/resources/some_folder_to_move")
+ HdfsHelper.deleteFolder("src/test/resources/renamed_folder")
+
+ // Let's create the folder to rename:
+ HdfsHelper.writeToHdfsFile(
+ "whatever", "src/test/resources/some_folder_to_move/file_1.txt"
+ )
+ HdfsHelper.writeToHdfsFile(
+ "something", "src/test/resources/some_folder_to_move/file_2.txt"
+ )
+
+ // 1: Let's fail to move the folder with the moveFile() method:
+
+ assert(HdfsHelper.fileExists("src/test/resources/some_folder_to_move/file_1.txt"))
+ assert(HdfsHelper.fileExists("src/test/resources/some_folder_to_move/file_2.txt"))
+ assert(!HdfsHelper.folderExists("src/test/resources/renamed_folder"))
+
+ // Let's rename the folder:
+ val messageThrown = intercept[IllegalArgumentException] {
+ HdfsHelper.moveFile(
+ "src/test/resources/some_folder_to_move",
+ "src/test/resources/renamed_folder"
+ )
+ }
+ val expectedMessage = "To move a folder, prefer using the moveFolder() method."
+ assert(messageThrown.getMessage === expectedMessage)
+
+ assert(HdfsHelper.fileExists("src/test/resources/some_folder_to_move/file_1.txt"))
+ assert(HdfsHelper.fileExists("src/test/resources/some_folder_to_move/file_2.txt"))
+ assert(!HdfsHelper.folderExists("src/test/resources/renamed_folder"))
+
+ // 2: Let's successfuly move the folder with the moveFolder() method:
+
+ // Let's rename the folder:
+ HdfsHelper.moveFolder(
+ "src/test/resources/some_folder_to_move",
+ "src/test/resources/renamed_folder"
+ )
+
+ assert(!HdfsHelper.folderExists("src/test/resources/some_folder_to_move"))
+ assert(HdfsHelper.fileExists("src/test/resources/renamed_folder/file_1.txt"))
+ assert(HdfsHelper.fileExists("src/test/resources/renamed_folder/file_2.txt"))
+
+ val newContent = sparkContext.textFile(
+ "src/test/resources/renamed_folder"
+ ).collect().sorted
+
+ assert(newContent === Array("something", "whatever"))
+
+ HdfsHelper.deleteFolder("src/test/resources/renamed_folder")
+
+ sparkContext.stop()
+ }
+
+ test("Append Header and Footer to File") {
+
+ val sparkContext = new SparkContext(
+ new SparkConf().setAppName("Spark").setMaster("local[2]")
+ )
+
+ // 1: Without the tmp/working folder:
+
+ HdfsHelper.deleteFile("src/test/resources/header_footer_file.txt")
+
+ // Let's create the file for which to add header and footer:
+ HdfsHelper.writeToHdfsFile(
+ "whatever\nsomething else\n",
+ "src/test/resources/header_footer_file.txt"
+ )
+
+ HdfsHelper.appendHeaderAndFooter(
+ "src/test/resources/header_footer_file.txt", "my_header", "my_footer"
+ )
+
+ var newContent = sparkContext.textFile(
+ "src/test/resources/header_footer_file.txt"
+ ).collect.mkString("\n")
+ var expectedNewContent = (
+ "my_header\n" +
+ "whatever\n" +
+ "something else\n" +
+ "my_footer"
+ )
+
+ assert(newContent === expectedNewContent)
+
+ HdfsHelper.deleteFile("src/test/resources/header_footer_file.txt")
+
+ // 2: With the tmp/working folder:
+
+ // Let's create the file for which to add header and footer:
+ HdfsHelper.writeToHdfsFile(
+ "whatever\nsomething else\n",
+ "src/test/resources/header_footer_file.txt"
+ )
+
+ HdfsHelper.appendHeaderAndFooter(
+ "src/test/resources/header_footer_file.txt", "my_header", "my_footer",
+ workingFolderPath = "src/test/resources/header_footer_tmp"
+ )
+
+ assert(HdfsHelper.folderExists("src/test/resources/header_footer_tmp"))
+ assert(!HdfsHelper.fileExists("src/test/resources/header_footer_tmp/xml.tmp"))
+
+ newContent = sparkContext.textFile(
+ "src/test/resources/header_footer_file.txt"
+ ).collect.mkString("\n")
+ expectedNewContent = (
+ "my_header\n" +
+ "whatever\n" +
+ "something else\n" +
+ "my_footer"
+ )
+
+ assert(newContent === expectedNewContent)
+
+ HdfsHelper.deleteFile("src/test/resources/header_footer_file.txt")
+ HdfsHelper.deleteFolder("src/test/resources/header_footer_tmp")
+
+ sparkContext.stop()
+ }
+
+ test("Validate Xml Hdfs File with Xsd") {
+
+ // 1: Valid xml:
+ HdfsHelper.deleteFile("src/test/resources/xml_file.txt")
+ HdfsHelper.writeToHdfsFile(
+ "\n" +
+ " 24\n" +
+ " 34 thingy street, someplace, sometown\n" +
+ "",
+ "src/test/resources/xml_file.txt"
+ )
+
+ var xsdFile = getClass.getResource("/some_xml.xsd")
+
+ var isValid = HdfsHelper.isHdfsXmlCompliantWithXsd(
+ "src/test/resources/xml_file.txt", xsdFile
+ )
+ assert(isValid)
+
+ // 2: Invalid xml:
+ HdfsHelper.deleteFile("src/test/resources/xml_file.txt")
+ HdfsHelper.writeToHdfsFile(
+ "\n" +
+ " trente\n" +
+ " 34 thingy street, someplace, sometown\n" +
+ "",
+ "src/test/resources/xml_file.txt"
+ )
+
+ xsdFile = getClass.getResource("/some_xml.xsd")
+
+ isValid = HdfsHelper.isHdfsXmlCompliantWithXsd(
+ "src/test/resources/xml_file.txt", xsdFile
+ )
+ assert(!isValid)
+
+ HdfsHelper.deleteFile("src/test/resources/xml_file.txt")
+ }
+
+ test("Load Typesafe Config from Hdfs") {
+
+ HdfsHelper.deleteFile("src/test/resources/typesafe_config.conf")
+ HdfsHelper.writeToHdfsFile(
+ "config {\n" +
+ " something = something_else\n" +
+ "}",
+ "src/test/resources/typesafe_config.conf"
+ )
+
+ val config = HdfsHelper.loadTypesafeConfigFromHdfs(
+ "src/test/resources/typesafe_config.conf"
+ )
+
+ assert(config.getString("config.something") === "something_else")
+
+ HdfsHelper.deleteFile("src/test/resources/typesafe_config.conf")
+ }
+
+ test("Load Xml File from Hdfs") {
+
+ HdfsHelper.deleteFile("src/test/resources/folder/xml_to_load.xml")
+
+ HdfsHelper.writeToHdfsFile(
+ "\n" +
+ " whatever\n" +
+ "",
+ "src/test/resources/folder/xml_to_load.xml"
+ )
+
+ val xmlContent = HdfsHelper.loadXmlFileFromHdfs(
+ "src/test/resources/folder/xml_to_load.xml"
+ )
+
+ assert((xmlContent \ "sometag" \ "@value").text === "something")
+ assert((xmlContent \ "sometag").text === "whatever")
+
+ HdfsHelper.deleteFolder("src/test/resources/folder/")
+ }
+}
diff --git a/src/test/scala/com/spark_helper/SparkHelperTest.scala b/src/test/scala/com/spark_helper/SparkHelperTest.scala
new file mode 100644
index 0000000..5130e57
--- /dev/null
+++ b/src/test/scala/com/spark_helper/SparkHelperTest.scala
@@ -0,0 +1,271 @@
+package com.spark_helper
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkConf
+
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import org.scalatest.FunSuite
+
+/** Testing facility for the SparkHelper.
+ *
+ * @author Xavier Guihot
+ * @since 2017-02
+ */
+class SparkHelperTest extends FunSuite {
+
+ Logger.getLogger("org").setLevel(Level.OFF)
+ Logger.getLogger("akka").setLevel(Level.OFF)
+
+ test("Save as Single Text File") {
+
+ val sparkContext = new SparkContext(
+ new SparkConf().setAppName("Spark").setMaster("local[2]")
+ )
+
+ // 1: Without an intermediate working dir:
+
+ var repartitionedDataToStore = sparkContext.parallelize(Array(
+ "data_a", "data_b", "data_c"
+ )).repartition(3)
+
+ HdfsHelper.deleteFile("src/test/resources/single_text_file.txt")
+ SparkHelper.saveAsSingleTextFile(
+ repartitionedDataToStore, "src/test/resources/single_text_file.txt"
+ )
+
+ var singleFileStoredData = sparkContext.textFile(
+ "src/test/resources/single_text_file.txt"
+ ).collect().sorted
+ assert(singleFileStoredData === Array("data_a", "data_b", "data_c"))
+
+ HdfsHelper.deleteFile("src/test/resources/single_text_file.txt")
+
+ // 2: With an intermediate working dir:
+ // Notice as well that we test by moving the single file in a folder
+ // which doesn't exists.
+
+ repartitionedDataToStore = sparkContext.parallelize(Array(
+ "data_a", "data_b", "data_c"
+ )).repartition(3)
+
+ HdfsHelper.deleteFile("src/test/resources/folder/single_text_file.txt")
+ HdfsHelper.deleteFolder("src/test/resources/folder")
+ SparkHelper.saveAsSingleTextFile(
+ repartitionedDataToStore, "src/test/resources/folder/single_text_file.txt",
+ workingFolder = "src/test/resources/tmp"
+ )
+ assert(HdfsHelper.fileExists("src/test/resources/folder/single_text_file.txt"))
+
+ singleFileStoredData = sparkContext.textFile(
+ "src/test/resources/folder/single_text_file.txt"
+ ).collect().sorted
+ assert(singleFileStoredData === Array("data_a", "data_b", "data_c"))
+
+ HdfsHelper.deleteFolder("src/test/resources/folder")
+ HdfsHelper.deleteFolder("src/test/resources/tmp")
+
+ sparkContext.stop()
+ }
+
+ test("Read Text File with Specific Record Delimiter") {
+
+ val sparkContext = new SparkContext(
+ new SparkConf().setAppName("Spark").setMaster("local[2]")
+ )
+
+ // 1: Let's read a file where a record begins with a line begining with
+ // 3 and other lines begining by 4:
+
+ HdfsHelper.deleteFile("src/test/resources/some_weird_format.txt")
+
+ val textContent = (
+ "3 first line of the first record\n" +
+ "4 another line of the first record\n" +
+ "4 and another one for the first record\n" +
+ "3 first line of the second record\n" +
+ "3 first line of the third record\n" +
+ "4 another line for the third record"
+ )
+
+ HdfsHelper.writeToHdfsFile(
+ textContent, "src/test/resources/some_weird_format.txt"
+ )
+
+ var computedRecords = SparkHelper.textFileWithDelimiter(
+ "src/test/resources/some_weird_format.txt", sparkContext, "\n3"
+ ).collect()
+
+ var expectedRecords = Array(
+ (
+ "3 first line of the first record\n" +
+ "4 another line of the first record\n" +
+ "4 and another one for the first record"
+ ),
+ " first line of the second record",
+ (
+ " first line of the third record\n" +
+ "4 another line for the third record"
+ )
+ )
+
+ assert(computedRecords === expectedRecords)
+
+ HdfsHelper.deleteFile("src/test/resources/some_weird_format.txt")
+
+ // 2: Let's read an xml file:
+
+ HdfsHelper.deleteFile("src/test/resources/some_basic_xml.xml")
+
+ val xmlTextContent = (
+ "\n" +
+ "\n" +
+ "34 thingy street, someplace, sometown\n" +
+ "\n" +
+ "\n" +
+ "12 thingy street, someplace, sometown\n" +
+ "\n" +
+ ""
+ )
+
+ HdfsHelper.writeToHdfsFile(
+ xmlTextContent, "src/test/resources/some_basic_xml.xml"
+ )
+
+ computedRecords = SparkHelper.textFileWithDelimiter(
+ "src/test/resources/some_basic_xml.xml", sparkContext, "\n"
+ ).collect()
+
+ expectedRecords = Array(
+ "\n",
+ (
+ "34 thingy street, someplace, sometown\n" +
+ "\n"
+ ),
+ (
+ "12 thingy street, someplace, sometown\n" +
+ "\n" +
+ ""
+ )
+ )
+
+ assert(computedRecords === expectedRecords)
+
+ HdfsHelper.deleteFile("src/test/resources/some_basic_xml.xml")
+
+ sparkContext.stop()
+ }
+
+ test("Save as Text File by Key") {
+
+ val sparkContext = new SparkContext(
+ new SparkConf().setAppName("Spark").setMaster("local[2]")
+ )
+
+ HdfsHelper.deleteFolder("src/test/resources/key_value_storage")
+
+ val someKeyValueRdd = sparkContext.parallelize[(String, String)](
+ Array(
+ ("key_1", "value_a"),
+ ("key_1", "value_b"),
+ ("key_2", "value_c"),
+ ("key_2", "value_b"),
+ ("key_2", "value_d"),
+ ("key_3", "value_a"),
+ ("key_3", "value_b")
+ )
+ )
+
+ SparkHelper.saveAsTextFileByKey(
+ someKeyValueRdd, "src/test/resources/key_value_storage", 3
+ )
+
+ // The folder key_value_storage has been created:
+ assert(HdfsHelper.folderExists("src/test/resources/key_value_storage"))
+
+ // And it contains one file per key:
+ val genratedKeyFiles = HdfsHelper.listFileNamesInFolder(
+ "src/test/resources/key_value_storage"
+ )
+ val expectedKeyFiles = List("_SUCCESS", "key_1", "key_2", "key_3")
+ assert(genratedKeyFiles === expectedKeyFiles)
+
+ val valuesForKey1 = sparkContext.textFile(
+ "src/test/resources/key_value_storage/key_1"
+ ).collect().sorted
+ assert(valuesForKey1 === Array("value_a", "value_b"))
+
+ val valuesForKey2 = sparkContext.textFile(
+ "src/test/resources/key_value_storage/key_2"
+ ).collect().sorted
+ assert(valuesForKey2 === Array("value_b", "value_c", "value_d"))
+
+ val valuesForKey3 = sparkContext.textFile(
+ "src/test/resources/key_value_storage/key_3"
+ ).collect().sorted
+ assert(valuesForKey3 === Array("value_a", "value_b"))
+
+ HdfsHelper.deleteFolder("src/test/resources/key_value_storage")
+
+ sparkContext.stop()
+ }
+
+ test("Decrease Coalescence Level") {
+
+ val sparkContext = new SparkContext(
+ new SparkConf().setAppName("Spark").setMaster("local[2]")
+ )
+
+ HdfsHelper.deleteFolder("src/test/resources/re_coalescence_test_input")
+ HdfsHelper.deleteFolder("src/test/resources/re_coalescence_test_output")
+
+ // Let's create the folder with high level of coalescence (3 files):
+ SparkHelper.saveAsSingleTextFile(
+ sparkContext.parallelize[String](Array(
+ "data_1_a", "data_1_b", "data_1_c"
+ )),
+ "src/test/resources/re_coalescence_test_input/input_file_1"
+ )
+ SparkHelper.saveAsSingleTextFile(
+ sparkContext.parallelize[String](Array(
+ "data_2_a", "data_2_b"
+ )),
+ "src/test/resources/re_coalescence_test_input/input_file_2"
+ )
+ SparkHelper.saveAsSingleTextFile(
+ sparkContext.parallelize[String](Array(
+ "data_3_a", "data_3_b", "data_3_c"
+ )),
+ "src/test/resources/re_coalescence_test_input/input_file_3"
+ )
+
+ // Let's decrease the coalescence level in order to only have 2 files:
+ SparkHelper.decreaseCoalescence(
+ "src/test/resources/re_coalescence_test_input",
+ "src/test/resources/re_coalescence_test_output",
+ 2, sparkContext
+ )
+
+ // And we check we have two files in output:
+ val outputFileList = HdfsHelper.listFileNamesInFolder(
+ "src/test/resources/re_coalescence_test_output"
+ )
+ val expectedFileList = List("_SUCCESS", "part-00000", "part-00001")
+ assert(outputFileList === expectedFileList)
+
+ // And that all input data is in the output:
+ val outputData = sparkContext.textFile(
+ "src/test/resources/re_coalescence_test_output"
+ ).collect.sorted
+ val expectedOutputData = Array(
+ "data_1_a", "data_1_b", "data_1_c", "data_2_a", "data_2_b",
+ "data_3_a", "data_3_b", "data_3_c"
+ )
+ assert(outputData === expectedOutputData)
+
+ HdfsHelper.deleteFolder("src/test/resources/re_coalescence_test_output")
+
+ sparkContext.stop()
+ }
+}
diff --git a/src/test/scala/com/spark_helper/monitoring/MonitorTest.scala b/src/test/scala/com/spark_helper/monitoring/MonitorTest.scala
new file mode 100644
index 0000000..d3aff1e
--- /dev/null
+++ b/src/test/scala/com/spark_helper/monitoring/MonitorTest.scala
@@ -0,0 +1,279 @@
+package com.spark_helper.monitoring
+
+import com.spark_helper.DateHelper
+import com.spark_helper.HdfsHelper
+
+import org.apache.spark.SparkContext
+import org.apache.spark.SparkConf
+
+import java.security.InvalidParameterException
+
+import org.apache.log4j.Logger
+import org.apache.log4j.Level
+
+import org.scalatest.FunSuite
+
+/** Testing facility for the Monitor facility.
+ *
+ * @author Xavier Guihot
+ * @since 2017-02
+ */
+class MonitorTest extends FunSuite {
+
+ Logger.getLogger("org").setLevel(Level.OFF)
+ Logger.getLogger("akka").setLevel(Level.OFF)
+
+ test("Basic Monitoring Testing") {
+
+ var monitor = new Monitor()
+ assert(monitor.isSuccess())
+ var report = removeTimeStamps(monitor.getReport())
+ assert(report === "[..:..] Begining\n")
+
+ // Creation of the Monitor object with additional info:
+ monitor = new Monitor(
+ "Processing of whatever", "xguihot@gmail.com",
+ "Documentation: https://github.com/xavierguihot/spark_helper"
+ )
+ report = removeTimeStamps(monitor.getReport())
+ var expectedReport = (
+ " Processing of whatever\n" +
+ "\n" +
+ "Point of contact: xguihot@gmail.com\n" +
+ "Documentation: https://github.com/xavierguihot/spark_helper\n" +
+ "[..:..] Begining\n"
+ )
+ assert(report === expectedReport)
+
+ // Simple text update without success modification:
+ monitor = new Monitor()
+ monitor.updateReport("My First Stage")
+ report = removeTimeStamps(monitor.getReport())
+ expectedReport = (
+ "[..:..] Begining\n" +
+ "[..:..-..:..] My First Stage\n"
+ )
+ assert(report === expectedReport)
+
+ monitor.updateReport("My Second Stage")
+ report = removeTimeStamps(monitor.getReport())
+ expectedReport = (
+ "[..:..] Begining\n" +
+ "[..:..-..:..] My First Stage\n" +
+ "[..:..-..:..] My Second Stage\n"
+ )
+ assert(report === expectedReport)
+
+ // Update report with success or failure:
+ monitor = new Monitor()
+ monitor.updateReportWithSuccess("My First Stage")
+ report = removeTimeStamps(monitor.getReport())
+ expectedReport = (
+ "[..:..] Begining\n" +
+ "[..:..-..:..] My First Stage: success\n"
+ )
+ assert(report === expectedReport)
+ assert(monitor.isSuccess())
+ // Failure:
+ monitor.updateReportWithFailure("My Second Stage")
+ report = removeTimeStamps(monitor.getReport())
+ expectedReport = (
+ "[..:..] Begining\n" +
+ "[..:..-..:..] My First Stage: success\n" +
+ "[..:..-..:..] My Second Stage: failed\n"
+ )
+ assert(report === expectedReport)
+ assert(!monitor.isSuccess())
+ // A success after a failure, which doesn't overwrite the failure:
+ monitor.updateReportWithSuccess("My Third Stage")
+ report = removeTimeStamps(monitor.getReport())
+ expectedReport = (
+ "[..:..] Begining\n" +
+ "[..:..-..:..] My First Stage: success\n" +
+ "[..:..-..:..] My Second Stage: failed\n" +
+ "[..:..-..:..] My Third Stage: success\n"
+ )
+ assert(report === expectedReport)
+ assert(!monitor.isSuccess())
+ }
+
+ test("Add Error Stack Trace to Report") {
+ val monitor = new Monitor()
+ try {
+ "a".toInt
+ } catch {
+ case nfe: NumberFormatException => {
+ monitor.updateReportWithError(
+ nfe, "Parse to integer", "my diagnostic"
+ )
+ }
+ }
+ // Warning, here I remove the stack trace because it depends on the
+ // java/scala version! And yes this test is thus quite not usefull.
+ val report = removeTimeStamps(monitor.getReport()).split("\n").take(3).mkString("\n")
+ val expectedReport = (
+ "[..:..] Begining\n" +
+ "[..:..-..:..] Parse to integer: failed\n" +
+ " Diagnostic: my diagnostic"
+ )
+ assert(report === expectedReport)
+ }
+
+ test("Simple Tests") {
+
+ // 1: List of tests:
+ var monitor = new Monitor()
+ var success = monitor.updateByKpisValidation(
+ List(
+ new Test("pctOfWhatever", 0.06f, "inferior to", 0.1f, "pct"),
+ new Test("pctOfSomethingElse", 0.27f, "superior to", 0.3f, "pct"),
+ new Test("someNbr", 1235f, "equal to", 1235f, "nbr")
+ ),
+ "Tests for whatever"
+ )
+
+ assert(!success)
+ assert(!monitor.isSuccess())
+
+ var report = removeTimeStamps(monitor.getReport())
+ var expectedReport = (
+ "[..:..] Begining\n" +
+ "[..:..-..:..] Tests for whatever: failed\n" +
+ " KPI: pctOfWhatever\n" +
+ " Value: 0.06%\n" +
+ " Must be inferior to 0.1%\n" +
+ " Validated: true\n" +
+ " KPI: pctOfSomethingElse\n" +
+ " Value: 0.27%\n" +
+ " Must be superior to 0.3%\n" +
+ " Validated: false\n" +
+ " KPI: someNbr\n" +
+ " Value: 1235.0\n" +
+ " Must be equal to 1235.0\n" +
+ " Validated: true\n"
+ )
+ assert(report === expectedReport)
+
+ // 2: Single test:
+ monitor = new Monitor()
+ success = monitor.updateByKpiValidation(
+ new Test("someNbr", 55e6f, "superior to", 50e6f, "nbr"),
+ "Tests for whatever"
+ )
+
+ assert(success)
+ assert(monitor.isSuccess())
+
+ report = removeTimeStamps(monitor.getReport())
+ expectedReport = (
+ "[..:..] Begining\n" +
+ "[..:..-..:..] Tests for whatever: success\n" +
+ " KPI: someNbr\n" +
+ " Value: 5.5E7\n" +
+ " Must be superior to 5.0E7\n" +
+ " Validated: true\n"
+ )
+ assert(report === expectedReport)
+ }
+
+ test("Incorrect User Inputs for Test Objects") {
+ val messageThrown = intercept[InvalidParameterException] {
+ new Test("pctOfWhatever", 0.06f, "skdjbv", 0.1f, "pct")
+ }
+ val expectedMessage = (
+ "The threshold type can only be \"superior to\", " +
+ "\"inferior to\"or \"equal to\", but you used: \"skdjbv\"."
+ )
+ assert(messageThrown.getMessage === expectedMessage)
+ }
+
+ test("Save Report") {
+
+ val sparkContext = new SparkContext(
+ new SparkConf().setAppName("Spark").setMaster("local[2]")
+ )
+
+ // We remove previous data:
+ HdfsHelper.deleteFolder("src/test/resources/logs")
+
+ val monitor = new Monitor(
+ "My Processing", "xguihot@gmail.com",
+ "Documentation: https://github.com/xavierguihot/spark_helper"
+ )
+ monitor.updateReport("Doing something: success")
+
+ monitor.saveReport("src/test/resources/logs")
+
+ val reportStoredLines = sparkContext.textFile(
+ "src/test/resources/logs/*.log.success"
+ ).collect().toList.mkString("\n")
+ val extectedReport = (
+ " My Processing\n" +
+ "\n" +
+ "Point of contact: xguihot@gmail.com\n" +
+ "Documentation: https://github.com/xavierguihot/spark_helper\n" +
+ "[..:..] Begining\n" +
+ "[..:..-..:..] Doing something: success\n" +
+ "[..:..] Duration: 00:00:00"
+ )
+ assert(removeTimeStamps(reportStoredLines) === extectedReport)
+
+ sparkContext.stop()
+ }
+
+ test("Save Report with Purge") {
+
+ HdfsHelper.deleteFolder("src/test/resources/logs")
+
+ // Let's create an outdated log file (12 days before):
+ val outdatedDate = DateHelper.nDaysBefore(12, "yyyyMMdd")
+ val outdatedLogFile = outdatedDate + ".log.success"
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/logs/" + outdatedLogFile)
+ // Let's create a log file not old enough to be purged (3 days before):
+ val notOutdatedDate = DateHelper.nDaysBefore(3, "yyyyMMdd")
+ val notOutdatedLogFile = notOutdatedDate + ".log.failed"
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/logs/" + notOutdatedLogFile)
+
+ // Let's create the previous current.failed status log file:
+ HdfsHelper.writeToHdfsFile("", "src/test/resources/logs/current.failed")
+
+ // And we save the new report with the purge option:
+ val monitor = new Monitor()
+ monitor.saveReport(
+ "src/test/resources/logs", purgeLogs = true, purgeWindow = 7
+ )
+
+ assert(!HdfsHelper.fileExists("src/test/resources/logs/" + outdatedLogFile))
+ assert(HdfsHelper.fileExists("src/test/resources/logs/" + notOutdatedLogFile))
+ assert(!HdfsHelper.fileExists("src/test/resources/logs/current.failed"))
+ assert(HdfsHelper.fileExists("src/test/resources/logs/current.success"))
+
+ HdfsHelper.deleteFolder("src/test/resources/logs")
+ }
+
+ private def removeTimeStamps(logs: String): String = {
+
+ var timeStampFreeLogs = logs
+ var index = timeStampFreeLogs.indexOf("[")
+
+ while (index >= 0) {
+
+ if (timeStampFreeLogs(index + 6) == ']') // [12:15]
+ timeStampFreeLogs = (
+ timeStampFreeLogs.substring(0, index) +
+ "[..:..]" +
+ timeStampFreeLogs.substring(index + 7)
+ )
+ else if (timeStampFreeLogs(index + 12) == ']') // [12:15-12:23]
+ timeStampFreeLogs = (
+ timeStampFreeLogs.substring(0, index) +
+ "[..:..-..:..]" +
+ timeStampFreeLogs.substring(index + 13)
+ )
+
+ index = timeStampFreeLogs.indexOf("[", index + 1);
+ }
+
+ timeStampFreeLogs
+ }
+}