Skip to content

Commit

Permalink
Patches for MR3
Browse files Browse the repository at this point in the history
  • Loading branch information
Sungwoo Park committed Mar 16, 2021
1 parent 043ba81 commit 979a377
Show file tree
Hide file tree
Showing 362 changed files with 12,177 additions and 3,261 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,10 @@ private JsonParserFactory() {
* @return the appropriate JsonParser to print a JSONObject into outputStream.
*/
public static JsonParser getParser(HiveConf conf) {
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("tez")) {
String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
if (engine.equals("mr3") || engine.equals("tez")) {
return new TezJsonParser();
}
if (HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE).equals("spark")) {
return new SparkJsonParser();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -190,14 +190,10 @@ public static boolean canRenderInPlace(HiveConf conf) {
String engine = HiveConf.getVar(conf, HiveConf.ConfVars.HIVE_EXECUTION_ENGINE);
boolean inPlaceUpdates = false;

if (engine.equals("tez")) {
if (engine.equals("mr3") || engine.equals("tez")) {
inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.TEZ_EXEC_INPLACE_PROGRESS);
}

if (engine.equals("spark")) {
inPlaceUpdates = HiveConf.getBoolVar(conf, HiveConf.ConfVars.SPARK_EXEC_INPLACE_PROGRESS);
}

return inPlaceUpdates && isUnixTerminal();
}

Expand Down
142 changes: 136 additions & 6 deletions common/src/java/org/apache/hadoop/hive/conf/HiveConf.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,7 @@ public class HiveConf extends Configuration {

private Pattern modWhiteListPattern = null;
private volatile boolean isSparkConfigUpdated = false;
private volatile boolean isMr3ConfigUpdated = false;
private static final int LOG_PREFIX_LENGTH = 64;

public boolean getSparkConfigUpdated() {
Expand All @@ -105,6 +106,14 @@ public void setSparkConfigUpdated(boolean isSparkConfigUpdated) {
this.isSparkConfigUpdated = isSparkConfigUpdated;
}

public boolean getMr3ConfigUpdated() {
return isMr3ConfigUpdated;
}

public void setMr3ConfigUpdated(boolean isMr3ConfigUpdated) {
this.isMr3ConfigUpdated = isMr3ConfigUpdated;
}

public interface EncoderDecoder<K, V> {
V encode(K key);
K decode(V value);
Expand Down Expand Up @@ -3002,7 +3011,7 @@ public static enum ConfVars {
HIVE_SSL_PROTOCOL_BLACKLIST("hive.ssl.protocol.blacklist", "SSLv2,SSLv3",
"SSL Versions to disable for all Hive Servers"),

HIVE_PRIVILEGE_SYNCHRONIZER("hive.privilege.synchronizer", true,
HIVE_PRIVILEGE_SYNCHRONIZER("hive.privilege.synchronizer", false,
"Whether to synchronize privileges from external authorizer periodically in HS2"),
HIVE_PRIVILEGE_SYNCHRONIZER_INTERVAL("hive.privilege.synchronizer.interval",
"1800s", new TimeValidator(TimeUnit.SECONDS),
Expand Down Expand Up @@ -3505,10 +3514,11 @@ public static enum ConfVars {
HIVE_DECODE_PARTITION_NAME("hive.decode.partition.name", false,
"Whether to show the unquoted partition names in query results."),

HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr", new StringSet(true, "mr", "tez", "spark"),
"Chooses execution engine. Options are: mr (Map reduce, default), tez, spark. While MR\n" +
// do not remove 'tez' which might be necessary, e.g., when connecting from Hue
HIVE_EXECUTION_ENGINE("hive.execution.engine", "mr3", new StringSet(true, "mr3", "tez"),
"Chooses execution engine. Options are: mr3 or tez. While MR\n" +
"remains the default engine for historical reasons, it is itself a historical engine\n" +
"and is deprecated in Hive 2 line. It may be removed without further warning."),
"and is deprecated in Hive 2 line. It may be removed without further warning. tez and spark are not supported."),

HIVE_EXECUTION_MODE("hive.execution.mode", "container", new StringSet("container", "llap"),
"Chooses whether query fragments will run in container or in llap"),
Expand Down Expand Up @@ -3717,7 +3727,7 @@ public static enum ConfVars {
"Turn on Tez' auto reducer parallelism feature. When enabled, Hive will still estimate data sizes\n" +
"and set parallelism estimates. Tez will sample source vertices' output sizes and adjust the estimates at runtime as\n" +
"necessary."),
TEZ_LLAP_MIN_REDUCER_PER_EXECUTOR("hive.tez.llap.min.reducer.per.executor", 0.95f,
TEZ_LLAP_MIN_REDUCER_PER_EXECUTOR("hive.tez.llap.min.reducer.per.executor", 0.2f,
"If above 0, the min number of reducers for auto-parallelism for LLAP scheduling will\n" +
"be set to this fraction of the number of executors."),
TEZ_MAX_PARTITION_FACTOR("hive.tez.max.partition.factor", 2f,
Expand Down Expand Up @@ -4458,7 +4468,127 @@ public static enum ConfVars {
"This parameter enables a number of optimizations when running on blobstores:\n" +
"(1) If hive.blobstore.use.blobstore.as.scratchdir is false, force the last Hive job to write to the blobstore.\n" +
"This is a performance optimization that forces the final FileSinkOperator to write to the blobstore.\n" +
"See HIVE-15121 for details.");
"See HIVE-15121 for details."),

MR3_CLIENT_CONNECT_TIMEOUT("hive.mr3.client.connect.timeout",
"60000ms", new TimeValidator(TimeUnit.MILLISECONDS),
"Timeout for Hive to establish connection to MR3 Application Master."),
// ContainerWorker
// MR3_CONTAINER_MAX_JAVA_HEAP_FRACTION is not passed to ContainerWorker. Rather it is written to
// MR3Conf which is passed to DAGAppMaster and ContainerWorkers. That is, it is a part of mr3-conf.pb
// which is shared by both DAGAppMaster and ContainerWorkers as a LocalResource.
// It is fixed per MR3Session, i.e., at the time of creating a new MR3Session.
MR3_CONTAINER_MAX_JAVA_HEAP_FRACTION("hive.mr3.container.max.java.heap.fraction", 0.8f,
"Fraction of task memory to be used as Java heap. Fixed at the time of creating each MR3Session."),
// for ContainerGroup (in DAG)
// These configurations are used only when creating ContainerGroup.
// Hence, they do not affect MR3Conf (mr3-conf.pb) passed to DAGAppMaster and ContainerWorkers.
MR3_CONTAINERGROUP_SCHEME("hive.mr3.containergroup.scheme", "all-in-one",
new StringSet("all-in-one", "per-map-reduce", "per-vertex"),
"Scheme for assigning Vertexes to ContainerGroups"),
MR3_CONTAINER_ENV("hive.mr3.container.env", null,
"Environment string for ContainerGroups"),
MR3_CONTAINER_JAVA_OPTS("hive.mr3.container.java.opts", null,
"Java options for ContainerGroups"),
MR3_CONTAINER_COMBINE_TASKATTEMPTS("hive.mr3.container.combine.taskattempts", true,
"Allow multiple concurrent tasks in the same container"),
MR3_CONTAINER_REUSE("hive.mr3.container.reuse", true,
"Allow container reuse for running different tasks"),
MR3_CONTAINER_MIX_TASKATTEMPTS("hive.mr3.container.mix.taskattempts", true,
"Allow concurrent tasks from different DAGs in the same container"),
MR3_CONTAINER_USE_PER_QUERY_CACHE("hive.mr3.container.use.per.query.cache", true,
"Use per-query cache shared by all tasks in the same container"),
// for DAG
// This configuration is used only when creating DAG.
// Hence, it does not affect MR3Conf (mr3-conf.pb) passed to DAGAppMaster and ContainerWorkers.
MR3_CONTAINER_STOP_CROSS_DAG_REUSE("hive.mr3.container.stop.cross.dag.reuse", false,
"Stop cross-DAG container reuse for ContainerGroups"),
// common to Vertex, ContainerGroup, LLAP Daemon
MR3_RESOURCE_VCORES_DIVISOR("hive.mr3.resource.vcores.divisor", 1,
"Divisor for CPU cores, between 1 and 1000"),
// Vertex
MR3_MAP_TASK_MEMORY_MB("hive.mr3.map.task.memory.mb", 1024,
"Memory allocated to each mapper, in MB"),
MR3_REDUCE_TASK_MEMORY_MB("hive.mr3.reduce.task.memory.mb", 1024,
"Memory allocated to each reducer, in MB"),
MR3_MAP_TASK_VCORES("hive.mr3.map.task.vcores", 1,
"CPU cores allocated to each mapper"),
MR3_REDUCE_TASK_VCORES("hive.mr3.reduce.task.vcores", 1,
"CPU cores allocated to each reducer"),
// ContainerGroup -- All-in-One
MR3_ALLINONE_CONTAINERGROUP_MEMORY_MB("hive.mr3.all-in-one.containergroup.memory.mb", 1024,
"Memory allocated to each ContainerGroup for All-in-One, in MB"),
MR3_ALLINONE_CONTAINERGROUP_VCORES("hive.mr3.all-in-one.containergroup.vcores", 1,
"CPU cores allocated to each ContainerGroup for All-in-One"),
// ContainerGroup -- Per-Map-Reduce and Per-Vertex
// Map/Reduce ContainerGroup size can be different from Vertex.taskResource, e.g.,
// 'combine TaskAttempts' is enabled
MR3_MAP_CONTAINERGROUP_MEMORY_MB("hive.mr3.map.containergroup.memory.mb", 1024,
"Memory allocated to each ContainerGroup for mappers, in MB"),
MR3_REDUCE_CONTAINERGROUP_MEMORY_MB("hive.mr3.reduce.containergroup.memory.mb", 1024,
"Memory allocated to each ContainerGroup for reducers, in MB"),
MR3_MAP_CONTAINERGROUP_VCORES("hive.mr3.map.containergroup.vcores", 1,
"CPU cores allocated to each ContainerGroup for mappers"),
MR3_REDUCE_CONTAINERGROUP_VCORES("hive.mr3.reduce.containergroup.vcores", 1,
"CPU cores allocated to each ContainerGroup for reducers"),
// use LLAP IO for All-in-One and Per-Map-Reduce schemes when LLAP_IO_ENABLED = true
MR3_LLAP_HEADROOM_MB("hive.mr3.llap.headroom.mb", 1024,
"Memory allocated to JVM headroom when LLAP/IO is enabled"),
MR3_LLAP_DAEMON_TASK_MEMORY_MB("hive.mr3.llap.daemon.task.memory.mb", 0,
"Memory allocated to a DaemonTaskAttempt for LLAP/IO, in MB"),
MR3_LLAP_DAEMON_TASK_VCORES("hive.mr3.llap.daemon.task.vcores", 0,
"CPU cores allocated to a DaemonTaskAttempt for LLAP I/O"),
MR3_LLAP_ORC_MEMORY_PER_THREAD_MB("hive.mr3.llap.orc.memory.per.thread.mb", 1024,
"Memory allocated to each ORC manager in low-level LLAP I/O threads, in MB"),
// EXEC
MR3_EXEC_SUMMARY("hive.mr3.exec.print.summary", false,
"Display breakdown of execution steps, for every query executed by the shell"),
MR3_EXEC_INPLACE_PROGRESS("hive.mr3.exec.inplace.progress", true,
"Update job execution progress in-place in the terminal"),
// daemon ShuffleHandler
MR3_USE_DAEMON_SHUFFLEHANDLER("hive.mr3.use.daemon.shufflehandler", 0,
"Number of daemon ShuffleHandlers in every non-local ContainerWorker"),
// HiveServer2
HIVE_SERVER2_MR3_SHARE_SESSION("hive.server2.mr3.share.session", false,
"Use a common MR3Session to be shared by all HiveSessions"),
// for internal use only
// -1: not stored in HiveConf yet
HIVE_QUERY_ESTIMATE_REDUCE_NUM_TASKS("hive.query.estimate.reducer.num.tasks.internal", -1,
"Estimate number of reducer tasks based on MR3SessionManagerImpl.getEstimateNumTasks() for each query"),
MR3_BUCKET_MAPJOIN_ESTIMATE_NUM_NODES("hive.mr3.bucket.mapjoin.estimate.num.nodes", -1,
"Estimate number of nodes for converting to bucket mapjoin"),

// runtime
MR3_MAPJOIN_INTERRUPT_CHECK_INTERVAL("hive.mr3.mapjoin.interrupt.check.interval", 100000L,
"Interval at which HashTableLoader checks the interrupt state"),
MR3_DAG_ADDITIONAL_CREDENTIALS_SOURCE("hive.mr3.dag.additional.credentials.source", "",
"Comma separated list of additional paths for obtaining DAG Credentials"),

// fault tolerance
MR3_AM_TASK_MAX_FAILED_ATTEMPTS("hive.mr3.am.task.max.failed.attempts", 3,
"Max number of attempts for each Task"),

// speculative execution
MR3_AM_TASK_CONCURRENT_RUN_THRESHOLD_PERCENT("hive.mr3.am.task.concurrent.run.threshold.percent", 100,
"Percentage of TaskAttempts that complete before starting speculative execution. " +
"Can be set to an integer between 1 and 100. " +
"If set to 100, speculative execution of TaskAttempts is disabled."),

// deleting Vertex-local directory
MR3_DAG_DELETE_VERTEX_LOCAL_DIRECTORY("hive.mr3.delete.vertex.local.directory", false,
"Delete Vertex-local directories in ContainerWork when all destination Vertexes complete"),

// high availability
MR3_ZOOKEEPER_APPID_NAMESPACE("hive.mr3.zookeeper.appid.namespace", "mr3AppId",
"ZooKeeper namespace for sharing Application ID"),

// Kubernetes
HIVE_MR3_LOCALIZE_SESSION_JARS("hive.mr3.localize.session.jars", true,
"Localize session jars"),

// Compaction using MR3
HIVE_MR3_COMPACTION_USING_MR3("hive.mr3.compaction.using.mr3", false,
"Enable compaction using mr3. High Availability needs to be enabled.");

public final String varname;
public final String altName;
Expand Down
7 changes: 7 additions & 0 deletions common/src/java/org/apache/hadoop/hive/ql/log/PerfLogger.java
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ public class PerfLogger {
public static final String TEZ_GET_SESSION = "TezGetSession";
public static final String SAVE_TO_RESULTS_CACHE = "saveToResultsCache";

public static final String MR3_SUBMIT_TO_RUNNING = "MR3SubmitToRunningDag";
public static final String MR3_BUILD_DAG = "MR3BuildDag";
public static final String MR3_SUBMIT_DAG = "MR3SubmitDag";
public static final String MR3_RUN_DAG = "MR3RunDag";
public static final String MR3_CREATE_VERTEX = "MR3CreateVertex";
public static final String MR3_RUN_VERTEX = "MR3RunVertex";

public static final String SPARK_SUBMIT_TO_RUNNING = "SparkSubmitToRunning";
public static final String SPARK_BUILD_PLAN = "SparkBuildPlan";
public static final String SPARK_BUILD_RDD_GRAPH = "SparkBuildRDDGraph";
Expand Down
10 changes: 10 additions & 0 deletions data/conf/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,14 @@
<value>false</value>
</property>

<property>
<name>mr3.container.runtime.auto.start.input</name>
<value>true</value>
</property>

<property>
<name>mr3.container.localize.python.working.dir.unsafe</name>
<value>true</value>
</property>

</configuration>
124 changes: 123 additions & 1 deletion data/conf/llap/hive-site.xml
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@

<property>
<name>hive.execution.engine</name>
<value>tez</value>
<value>mr3</value>
<description>Whether to use MR or Tez</description>
</property>

Expand Down Expand Up @@ -358,4 +358,126 @@
<value>1024</value>
</property>

<!-- MR3 -->

<property>
<name>hive.llap.execution.mode</name>
<value>all</value>
</property>

<property>
<name>hive.llap.io.enabled</name>
<value>true</value>
</property>

<property>
<name>hive.llap.io.memory.size</name>
<value>4Gb</value>
</property>

<property>
<name>hive.mr3.llap.headroom.mb</name>
<value>0</value>
</property>

<property>
<name>hive.llap.io.threadpool.size</name>
<value>2</value>
</property>

<property>
<name>hive.mr3.container.combine.taskattempts</name>
<value>true</value>
</property>

<property>
<name>hive.mr3.container.reuse</name>
<value>true</value>
</property>

<property>
<name>hive.mr3.containergroup.scheme</name>
<value>all-in-one</value>
</property>

<property>
<name>hive.mr3.container.max.java.heap.fraction</name>
<value>0.8f</value>
</property>

<property>
<name>hive.mr3.map.task.memory.mb</name>
<value>2048</value>
</property>

<property>
<name>hive.mr3.map.task.vcores</name>
<value>1</value>
</property>

<property>
<name>hive.mr3.reduce.task.memory.mb</name>
<value>2048</value>
</property>

<property>
<name>hive.mr3.reduce.task.vcores</name>
<value>1</value>
</property>

<property>
<name>hive.mr3.all-in-one.containergroup.memory.mb</name>
<value>12288</value>
</property>

<property>
<name>hive.mr3.all-in-one.containergroup.vcores</name>
<value>6</value>
</property>

<property>
<name>mr3.runtime</name>
<value>tez</value>
</property>

<property>
<name>mr3.master.mode</name>
<value>local-thread</value>
</property>

<property>
<name>mr3.am.worker.mode</name>
<value>local</value>
</property>

<property>
<name>mr3.am.resource.memory.mb</name>
<value>18432</value>
</property>

<property>
<name>mr3.am.local.resourcescheduler.max.memory.mb</name>
<value>16384</value>
</property>

<property>
<name>mr3.am.local.resourcescheduler.max.cpu.cores</name>
<value>128</value>
</property>

<property>
<name>mr3.container.localize.python.working.dir.unsafe</name>
<value>true</value>
</property>

<property>
<name>mr3.container.runtime.auto.start.input</name>
<value>true</value>
</property>

<property>
<name>mr3.container.localize.python.working.dir.unsafe</name>
<value>true</value>
</property>

</configuration>
Loading

0 comments on commit 979a377

Please sign in to comment.