Skip to content

Commit

Permalink
Add option to live store the monitoring report for each update during…
Browse files Browse the repository at this point in the history
… the job run
  • Loading branch information
xavierguihot committed Nov 15, 2017
1 parent c47844b commit 6f2460f
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 29 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
## Overview


Version: 1.0.6
Version: 1.0.7

API Scaladoc: [SparkHelper](http://xavierguihot.github.io/spark_helper/#com.spark_helper.SparkHelper$)

Expand Down Expand Up @@ -112,7 +112,7 @@ for a cool exemple.

With sbt, just add this one line to your build.sbt:

libraryDependencies += "spark_helper" % "spark_helper" % "1.0.6" from "https://github.com/xavierguihot/spark_helper/releases/download/v1.0.6/spark_helper-1.0.6.jar"
libraryDependencies += "spark_helper" % "spark_helper" % "1.0.7" from "https://github.com/xavierguihot/spark_helper/releases/download/v1.0.7/spark_helper-1.0.7.jar"


## Building the project:
Expand Down
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
name := "spark_helper"

version := "1.0.6"
version := "1.0.7"

scalaVersion := "2.10.4"

Expand Down
2 changes: 1 addition & 1 deletion docs/com/spark_helper/HdfsHelper$.html
Original file line number Diff line number Diff line change
Expand Up @@ -836,7 +836,7 @@ <h4 class="signature">
</span>
</h4>
<p class="shortcomment cmt">Saves text in a file when content is too small to really require an RDD.</p><div class="fullcomment"><div class="comment cmt"><p>Saves text in a file when content is too small to really require an RDD.</p><p>Please only consider this way of storing data when the data set is small
enough.</p><p>Overwrites the file is it already existed.</p><pre>HdfsHelper.writeToHdfsFile(<span class="lit">"some\nrelatively small\ntext"</span>, <span class="lit">"/some/hdfs/file/path.txt"</span>)</pre></div><dl class="paramcmts block"><dt class="param">content</dt><dd class="cmt"><p>the string to write in the file (you can provide a string
enough.</p><p>Overwrites the file if already existing.</p><pre>HdfsHelper.writeToHdfsFile(<span class="lit">"some\nrelatively small\ntext"</span>, <span class="lit">"/some/hdfs/file/path.txt"</span>)</pre></div><dl class="paramcmts block"><dt class="param">content</dt><dd class="cmt"><p>the string to write in the file (you can provide a string
with \n in order to write several lines).</p></dd><dt class="param">filePath</dt><dd class="cmt"><p>the path of the file in which to write the content
</p></dd></dl></div>
</li></ol>
Expand Down
14 changes: 10 additions & 4 deletions docs/com/spark_helper/monitoring/Monitor.html
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,13 @@ <h4 id="signature" class="signature">
fails to send the report of the error by email. Or simply to keep track of
historical kpis, processing times, ...</p><p>This is not supposed to be updated from within a Spark pipeline
(actions/transformations) but rather from the orchestration of the
pipelines.</p><p>Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
pipelines.</p><p>When instantiating the Monitor object with the optional parameter
&quot;logFolder&quot;, then you don't need to wait for the end of your job (your call
to saveReport()) to be able to look at what's going on. With this option
filled, any report update will directly be saved in the file
logFolder/current.ongoing. This way, you can have a live idea of what's
going on with your job and even if your job is forced-killed, you'll have
the possibility to easily have a look at what happened.</p><p>Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
/main/scala/com/spark_helper/monitoring/Monitor.scala">Monitor</a>
</p></div><dl class="attributes block"> <dt>Since</dt><dd><p>2017-02
</p></dd></dl><div class="toggleContainer block">
Expand Down Expand Up @@ -163,23 +169,23 @@ <h4 id="signature" class="signature">
<div id="constructors" class="members">
<h3>Instance Constructors</h3>
<ol><li name="com.spark_helper.monitoring.Monitor#&lt;init&gt;" visbl="pub" data-isabs="false" fullComment="yes" group="Ungrouped">
<a id="&lt;init&gt;(reportTitle:String,pointOfContact:String,additionalInfo:String):com.spark_helper.monitoring.Monitor"></a>
<a id="&lt;init&gt;(reportTitle:String,pointOfContact:String,additionalInfo:String,logFolder:String):com.spark_helper.monitoring.Monitor"></a>
<a id="&lt;init&gt;:Monitor"></a>
<h4 class="signature">
<span class="modifier_kind">
<span class="modifier"></span>
<span class="kind">new</span>
</span>
<span class="symbol">
<span class="name">Monitor</span><span class="params">(<span name="reportTitle">reportTitle: <span class="extype" name="scala.Predef.String">String</span> = <span class="symbol">&quot;&quot;</span></span>, <span name="pointOfContact">pointOfContact: <span class="extype" name="scala.Predef.String">String</span> = <span class="symbol">&quot;&quot;</span></span>, <span name="additionalInfo">additionalInfo: <span class="extype" name="scala.Predef.String">String</span> = <span class="symbol">&quot;&quot;</span></span>)</span>
<span class="name">Monitor</span><span class="params">(<span name="reportTitle">reportTitle: <span class="extype" name="scala.Predef.String">String</span> = <span class="symbol">&quot;&quot;</span></span>, <span name="pointOfContact">pointOfContact: <span class="extype" name="scala.Predef.String">String</span> = <span class="symbol">&quot;&quot;</span></span>, <span name="additionalInfo">additionalInfo: <span class="extype" name="scala.Predef.String">String</span> = <span class="symbol">&quot;&quot;</span></span>, <span name="logFolder">logFolder: <span class="extype" name="scala.Predef.String">String</span> = <span class="symbol">&quot;&quot;</span></span>)</span>
</span>
</h4>
<p class="shortcomment cmt">Creates a Monitor object.</p><div class="fullcomment"><div class="comment cmt"><p>Creates a Monitor object.</p><p>Creating the Monitor object like this:</p><pre><span class="kw">new</span> Monitor(<span class="lit">"My Spark Job Title"</span>, <span class="lit">"someone@box.com"</span>, <span class="lit">"Whatever pretty descritpion."</span>)</pre><p>will result in the report to start like this:</p><pre>My Spark Job Title

Point of contact: someone@box.com
Whatever pretty descritpion.
[..:..] Begining</pre></div><dl class="paramcmts block"><dt class="param">reportTitle</dt><dd class="cmt"><p>(optional) what's outputed as a first line of the report</p></dd><dt class="param">pointOfContact</dt><dd class="cmt"><p>(optional) the persons in charge of the job</p></dd><dt class="param">additionalInfo</dt><dd class="cmt"><p>(optional) anything you want written at the begining
of your report.
of your report.</p></dd><dt class="param">logFolder</dt><dd class="cmt"><p>(optional) the folder in which this report is stored
</p></dd></dl></div>
</li></ol>
</div>
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/com/spark_helper/HdfsHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,7 @@ object HdfsHelper extends Serializable {
* Please only consider this way of storing data when the data set is small
* enough.
*
* Overwrites the file is it already existed.
* Overwrites the file if already existing.
*
* {{{ HdfsHelper.writeToHdfsFile("some\nrelatively small\ntext", "/some/hdfs/file/path.txt") }}}
*
Expand Down
67 changes: 47 additions & 20 deletions src/main/scala/com/spark_helper/monitoring/Monitor.scala
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,14 @@ import java.lang.Throwable
* (actions/transformations) but rather from the orchestration of the
* pipelines.
*
* When instantiating the Monitor object with the optional parameter
* "logFolder", then you don't need to wait for the end of your job (your call
* to saveReport()) to be able to look at what's going on. With this option
* filled, any report update will directly be saved in the file
* logFolder/current.ongoing. This way, you can have a live idea of what's
* going on with your job and even if your job is forced-killed, you'll have
* the possibility to easily have a look at what happened.
*
* Source <a href="https://github.com/xavierguihot/spark_helper/blob/master/src
* /main/scala/com/spark_helper/monitoring/Monitor.scala">Monitor</a>
*
Expand All @@ -147,14 +155,19 @@ import java.lang.Throwable
* @param pointOfContact (optional) the persons in charge of the job
* @param additionalInfo (optional) anything you want written at the begining
* of your report.
* @param logFolder (optional) the folder in which this report is stored
*/
class Monitor(
reportTitle: String = "", pointOfContact: String = "",
additionalInfo: String = ""
additionalInfo: String = "", logFolder: String = ""
) {

private var success = true
private var report = initiateReport()
private var report = ""

// Let's initiate the report with parameters given while instantiating the
// Monitor object:
initiateReport()

private val begining = Calendar.getInstance().getTimeInMillis()

Expand All @@ -181,20 +194,7 @@ class Monitor(
*
* @param text the text to append to the report
*/
def updateReport(text: String) = {

val before = lastReportUpdate
val now = DateHelper.now("HH:mm")

lastReportUpdate = now

val update = "[" + before + "-" + now + "]" + " " + text

// We also print the update to also have them within yarn logs:
println("MONITOR: " + update)

report += update + "\n"
}
def updateReport(text: String): Unit = updateReport(text, true)

/** Updates the report with some text and a success.
*
Expand Down Expand Up @@ -416,7 +416,7 @@ class Monitor(
*/
def saveReport(
logFolder: String, purgeLogs: Boolean = false, purgeWindow: Int = 7
) = {
): Unit = {

// We add the job duration to the report:
val finalReport = report + (
Expand Down Expand Up @@ -444,11 +444,14 @@ class Monitor(
logFolder + "/current" + validationFileExtension
)

// And if we "live loged", then we remove the "current.ongoing" file:
HdfsHelper.deleteFile(logFolder + "/current.ongoing")

if (purgeLogs)
purgeOutdatedLogs(logFolder, purgeWindow)
}

private def initiateReport(): String = {
private def initiateReport(): Unit = {

var initialReport = ""

Expand All @@ -459,10 +462,34 @@ class Monitor(
if (additionalInfo != "")
initialReport += additionalInfo + "\n"

initialReport + DateHelper.now("[HH:mm]") + " Begining\n"
initialReport += DateHelper.now("[HH:mm]") + " Begining"

updateReport(initialReport, false)
}

private def updateReport(text: String, withTimestamp: Boolean): Unit = {

val before = lastReportUpdate
val now = DateHelper.now("HH:mm")

lastReportUpdate = now

val update =
if (withTimestamp) "[" + before + "-" + now + "]" + " " + text
else text

report += update + "\n"

// We print the update to also have it within yarn logs:
println("MONITOR: " + update)

// And if the logFolder parameter was used to instantiate the Monitor
// object, we also update live the log file:
if (!logFolder.isEmpty)
HdfsHelper.writeToHdfsFile(report, logFolder + "/current.ongoing")
}

private def purgeOutdatedLogs(logFolder: String, purgeWindow: Int) = {
private def purgeOutdatedLogs(logFolder: String, purgeWindow: Int): Unit = {

val nDaysAgo = DateHelper.nDaysBefore(purgeWindow, "yyyyMMdd")

Expand Down
26 changes: 26 additions & 0 deletions src/test/scala/com/spark_helper/monitoring/MonitorTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,32 @@ class MonitorTest extends FunSuite with SharedSparkContext {
assert(!monitor.isSuccess())
}

test("Check current.ongoing Live Monitoring") {

// We remove previous data:
HdfsHelper.deleteFolder("src/test/resources/logs")

val monitor = new Monitor(
"My Processing", "xguihot@gmail.com",
"Documentation: https://github.com/xavierguihot/spark_helper",
logFolder = "src/test/resources/logs"
)
monitor.updateReport("Doing something: success")

val reportStoredLines = sc.textFile(
"src/test/resources/logs/current.ongoing"
).collect().toList.mkString("\n")
val extectedReport = (
" My Processing\n" +
"\n" +
"Point of contact: xguihot@gmail.com\n" +
"Documentation: https://github.com/xavierguihot/spark_helper\n" +
"[..:..] Begining\n" +
"[..:..-..:..] Doing something: success"
)
assert(removeTimeStamps(reportStoredLines) === extectedReport)
}

test("Add Error Stack Trace to Report") {

val monitor = new Monitor()
Expand Down

0 comments on commit 6f2460f

Please sign in to comment.