demo.masouros 4 年之前
父节点
当前提交
af432af899
共有 1 个文件被更改,包括 394 次插入264 次删除
  1. 394 264
      src/python/config-generator-all.py

+ 394 - 264
src/python/config-generator-all.py

@@ -5,7 +5,7 @@
 #  This is an extremely simplified version meant only for tutorials
 #
 from __future__ import print_function
-import adddeps  #  fix sys.path
+# import adddeps  #  fix sys.path
 import sys
 
 import opentuner
@@ -29,39 +29,37 @@ class GccFlagsTuner(MeasurementInterface):
     APPLICATION PROPERTIES
     ======================
     """
-    # spark.driver.cores
-    # manipulator.add_parameter(EnumParameter('driver_cores', ['1', '2', '4'])) # only in cluster mode -> in client mode through spark.yarn.am.cores
     # spark.driver.maxResultSize
     manipulator.add_parameter(EnumParameter('spark_driver_maxresultsize', ['500m', '1g', '4g']))
     # spark.driver.memory
     manipulator.add_parameter(EnumParameter('spark_driver_memory', ['500m', '1g', '8g']))
-    # spark.driver.memoryOverhead
-    # manipulator.add_parameter(EnumParameter('driver_mem_overhead', ['500m', '1g', '8g'])) # only in cluster mode
+    # spark.executor.memory
+    manipulator.add_parameter(EnumParameter('spark_executor_memory', ['10g', '50g', '80g']))
+    # spark.executor.pyspark.memory
+    manipulator.add_parameter(EnumParameter('spark_executor_pyspark_memory', ['4g', '8g', '16g']))
+    # spark.executor.memoryOverhead
+    manipulator.add_parameter(EnumParameter('spark_executor_memory_overhead', ['500m', '2g', '4g']))
+    # spark.executor.instances
+    manipulator.add_parameter(EnumParameter('spark_executor_instances', ['1', '4', '10']))
 
+    # NOT USED
+    # spark.master
+    # spark.app.name
+    # spark.submit.deployMode
+    # spark.driver.cores # only in cluster mode
+    # spark.driver.memoryOverhead # only in cluster mode
+    # spark.resources.discoveryPlugin
     # spark.driver.resource.{resourceName}.amount
     # spark.driver.resource.{resourceName}.discoveryScript
-    # spark.driver.resource.{resourceName}.discoveryScript
     # spark.driver.resource.{resourceName}.vendor
-    # spark.resources.discoveryPlugin
-    # spark.executor.memory
-    manipulator.add_parameter(EnumParameter('spark_executor_memory', ['10g', '50g', '100g']))
-    # spark.executor.pyspark.memory
-    # spark.executor.memoryOverhead
-    # spark.executor.pyspark.memory
-    # spark.executor.memoryOverhead
-    # spark.executor.memory
-    # spark.executor.pyspark.memory
-    # spark.executor.resource.{resourceName}.amount
-    # spark.executor.resource.{resourceName}.discoveryScript
-    # spark.executor.resource.{resourceName}.discoveryScript
     # spark.executor.resource.{resourceName}.vendor
+    # spark.executor.resource.{resourceName}.discoveryScript
+    # spark.executor.resource.{resourceName}.amount
     # spark.extraListeners
-    # spark.local.dir
-    # spark.logConf
-    # spark.master
-    # spark.submit.deployMode
+    # spark.local.dir # might be important for SSD vs HDD
     # spark.log.callerContext
-    # spark.driver.supervise
+    # spark.driver.supervise # Only in standalone or Mesos cluster mode
+    # spark.logConf
     # spark.driver.log.dfsDir
     # spark.driver.log.persistToDfs.enabled
     # spark.history.fs.driverlog.cleaner.enabled
@@ -72,62 +70,55 @@ class GccFlagsTuner(MeasurementInterface):
     # spark.driver.log.layout
     # spark.driver.log.dfsDir
     # spark.driver.log.allowErasureCoding
+
     """
-    ======================
+    ===================
     RUNTIME ENVIRONMENT
-    ======================
+    ===================
     """
+    # spark.python.worker.memory
+    manipulator.add_parameter(EnumParameter('spark_python_worker_memory', ['64m', '512m', '4g']))
+    # spark.python.worker.reuse
+    manipulator.add_parameter(EnumParameter('spark_python_worker_reuse', ['true', 'false']))
+
+    # NOT USED
     # spark.driver.extraClassPath
     # spark.driver.defaultJavaOptions
     # spark.driver.extraJavaOptions
-    # spark.driver.memory
-    # spark.driver.extraJavaOptions
-    # spark.driver.memory
-    # spark.driver.defaultJavaOptions
     # spark.driver.extraLibraryPath
     # spark.driver.userClassPathFirst
     # spark.executor.extraClassPath
     # spark.executor.defaultJavaOptions
     # spark.executor.extraJavaOptions
-    # spark.executor.extraJavaOptions
-    # spark.executor.defaultJavaOptions
     # spark.executor.extraLibraryPath
-    # spark.executor.logs.rolling.maxRetainedFiles
     # spark.executor.logs.rolling.enableCompression
     # spark.executor.logs.rolling.maxSize
-    # spark.executor.logs.rolling.maxRetainedFiles
     # spark.executor.logs.rolling.strategy
     # spark.executor.logs.rolling.time.interval
-    # spark.executor.logs.rolling.maxSize
-    # spark.executor.logs.rolling.time.interval
     # spark.executor.logs.rolling.maxRetainedFiles
     # spark.executor.userClassPathFirst
-    # spark.driver.userClassPathFirst
     # spark.executorEnv.[EnvironmentVariableName]
     # spark.redaction.regex
     # spark.python.profile
     # spark.python.profile.dump
-    # spark.python.worker.memory
-    # spark.python.worker.reuse
     # spark.files
     # spark.submit.pyFiles
     # spark.jars
     # spark.jars.packages
-    # spark.jars.ivySettings
     # spark.jars.excludes
-    # spark.jars.packages
     # spark.jars.ivy
-    # spark.jars.packages
     # spark.jars.ivySettings
-    # spark.jars.packages
-    # spark.jars.repositories
     # spark.jars.repositories
-    # spark.jars.packages
     # spark.pyspark.driver.python
     # spark.pyspark.python
-    # spark.pyspark.python
+
+    """
+    ================
+    SHUFFLE BEHAVIOR
+    ================
+    """
     # spark.reducer.maxSizeInFlight
-    
+    manipulator.add_parameter(EnumParameter('spark_reducer_maxsizeinflight', ['12m', '48m', '100m']))
     # spark.reducer.maxReqsInFlight
     manipulator.add_parameter(EnumParameter('spark_reducer_maxreqsinflight', ['2147483647', '200000000']))
     # spark.reducer.maxBlocksInFlightPerAddress
@@ -137,7 +128,7 @@ class GccFlagsTuner(MeasurementInterface):
     # spark.io.compression.codec
     manipulator.add_parameter(EnumParameter('spark_io_compression_codec', ['snappy', 'lzf', 'lz4']))
     # spark.shuffle.file.buffer
-    manipulator.add_parameter(EnumParameter('spark_shuffle_file_buffer', ['16k', '32k', '64k']))
+    manipulator.add_parameter(EnumParameter('spark_shuffle_file_buffer', ['8k', '32k', '128k']))
     # spark.shuffle.io.maxRetries
     manipulator.add_parameter(EnumParameter('spark_shuffle_io_maxretries', ['2', '3', '6']))
     # spark.shuffle.io.numConnectionsPerPeer
@@ -146,20 +137,29 @@ class GccFlagsTuner(MeasurementInterface):
     manipulator.add_parameter(EnumParameter('spark_shuffle_io_preferdirectbufs', ['true', 'false']))
     # spark.shuffle.io.retryWait
     manipulator.add_parameter(EnumParameter('spark_shuffle_io_retrywait', ['3s', '5s', '8s']))
-    # spark.shuffle.io.backLog
     # spark.shuffle.service.enabled
-    # spark.shuffle.service.enabled
-    # spark.dynamicAllocation.enabled
-    # spark.shuffle.service.port
+    manipulator.add_parameter(EnumParameter('spark_shuffle_service_enabled', ['true', 'false']))
     # spark.shuffle.service.index.cache.size
-    # spark.shuffle.maxChunksBeingTransferred
+    manipulator.add_parameter(EnumParameter('spark_shuffle_service_index_cache_size', ['10m','100m', '500m']))
     # spark.shuffle.sort.bypassMergeThreshold
-    manipulator.add_parameter(EnumParameter('spark_shuffle_sort_bypassmergethreshold', ['100', '200', '400']))
+    manipulator.add_parameter(EnumParameter('spark_shuffle_sort_bypassmergethreshold', ['50', '200', '800']))
     # spark.shuffle.spill.compress
     manipulator.add_parameter(EnumParameter('spark_shuffle_spill_compress', ['true', 'false']))
+
+    # NOT USED
+    # spark.shuffle.maxChunksBeingTransferred
     # spark.shuffle.accurateBlockThreshold
     # spark.shuffle.registration.timeout
     # spark.shuffle.registration.maxAttempts
+    # spark.shuffle.service.port
+    # spark.shuffle.io.backLog
+
+    """
+    ========
+    SPARK UI
+    ========
+    """
+    # NOT USED
     # spark.eventLog.logBlockUpdates.enabled
     # spark.eventLog.enabled
     # spark.eventLog.longForm.enabled
@@ -176,56 +176,52 @@ class GccFlagsTuner(MeasurementInterface):
     # spark.eventLog.rolling.maxFileSize
     # spark.eventLog.rolling.enabled=true
     # spark.ui.dagGraph.retainedRootRDDs
-    manipulator.add_parameter(EnumParameter('spark_ui_daggraph_retainedrootrdds', ['2147483647', '200000000']))
     # spark.ui.enabled
-    manipulator.add_parameter(EnumParameter('spark_ui_enabled', ['true', 'false']))
     # spark.ui.killEnabled
     # spark.ui.liveUpdate.period
-    manipulator.add_parameter(EnumParameter('spark_ui_liveupdate_period', ['50ms', '100ms', '200ms', '-1'])) # ## caution -1
     # spark.ui.liveUpdate.minFlushPeriod
     # spark.ui.port
     # spark.ui.retainedJobs
-    manipulator.add_parameter(EnumParameter('spark_ui_retainedjobs', ['500', '1000', '2000']))
     # spark.ui.retainedStages
-    manipulator.add_parameter(EnumParameter('spark_ui_retainedstages', ['500', '1000', '2000']))
     # spark.ui.retainedTasks
-    manipulator.add_parameter(EnumParameter('spark_ui_retainedtasks', ['50000', '100000', '200000']))
     # spark.ui.reverseProxy
     # spark.ui.reverseProxyUrl
     # spark.ui.proxyRedirectUri
     # spark.ui.showConsoleProgress
     # spark.ui.custom.executor.log.url
     # spark.worker.ui.retainedExecutors
-    manipulator.add_parameter(EnumParameter('spark_worker_ui_retainedexecutors', ['500', '1000', '2000']))
     # spark.worker.ui.retainedDrivers
-    manipulator.add_parameter(EnumParameter('spark_worker_ui_retaineddrivers', ['500', '1000', '2000']))
     # spark.sql.ui.retainedExecutions
-    manipulator.add_parameter(EnumParameter('spark_sql_ui_retainedexecutions', ['500', '1000', '2000']))
     # spark.streaming.ui.retainedBatches
-    manipulator.add_parameter(EnumParameter('spark_streaming_ui_retainedbatches', ['500', '1000', '2000']))
     # spark.ui.retainedDeadExecutors
-    manipulator.add_parameter(EnumParameter('spark_ui_retaineddeadexecutors', ['50', '100', '200']))
     # spark.ui.filters
     # spark.<class name of filter>.param.<param name>=<value>
     # spark.ui.filters=com.test.filter1
     # spark.com.test.filter1.param.name1=foo
     # spark.com.test.filter1.param.name2=bar
     # spark.ui.requestHeaderSize
-    manipulator.add_parameter(EnumParameter('spark_ui_requestheadersize', ['4k', '8k', '16k']))
+
+    """
+    =============================
+    COMPRESSION AND SERIALIZATION
+    =============================
+    """
     # spark.broadcast.compress
     manipulator.add_parameter(EnumParameter('spark_broadcast_compress', ['true', 'false']))
     # spark.checkpoint.compress
+    manipulator.add_parameter(EnumParameter('spark_checkpoint_compress', ['true', 'false']))
     # spark.io.compression.lz4.blockSize
     manipulator.add_parameter(EnumParameter('spark_io_compression_lz4_blocksize', ['16k', '32k', '64k']))
     # spark.io.compression.snappy.blockSize
     manipulator.add_parameter(EnumParameter('spark_io_compress_snappy_blocksize', ['16k', '32k', '64k']))
     # spark.io.compression.zstd.level
+    manipulator.add_parameter(EnumParameter('spark_io_compression_zstd_level', ['0', '1', '4']))
     # spark.io.compression.zstd.bufferSize
-    # spark.kryo.classesToRegister
+    manipulator.add_parameter(EnumParameter('spark_io_compression_zstd_buffersize', ['8k', '32k', '128k']))
     # spark.kryo.referenceTracking
+    manipulator.add_parameter(EnumParameter('spark_kryo_referencetracking', ['true', 'false']))
     # spark.kryo.registrationRequired
-    # spark.kryo.registrator
-    # spark.kryo.classesToRegister
+    manipulator.add_parameter(EnumParameter('spark_kryo_registrationrequired', ['true', 'false']))
     # spark.kryo.unsafe
     manipulator.add_parameter(EnumParameter('spark_kryo_unsafe', ['true', 'false']))
     # spark.kryoserializer.buffer.max
@@ -235,16 +231,27 @@ class GccFlagsTuner(MeasurementInterface):
     # spark.rdd.compress
     manipulator.add_parameter(EnumParameter('spark_rdd_compress', ['true', 'false']))
     # spark.serializer
+    manipulator.add_parameter(EnumParameter('spark_serializer', ['org.apache.spark.serializer.JavaSerializer', 'org.apache.spark.serializer.KryoSerializer']))
     # spark.serializer.objectStreamReset
     manipulator.add_parameter(EnumParameter('spark_serializer_objectstreamreset', ['-1', '50', '100', '200']))
+    
+    # NOT USED
+    # spark.kryo.classesToRegister
+    # spark.kryo.registrator
+
+    """
+    =================
+    MEMORY MANAGEMENT
+    =================
+    """
     # spark.memory.fraction
     manipulator.add_parameter(EnumParameter('spark_memory_fraction', ['0.4', '0.6', '0.7']))
     # spark.memory.storageFraction
-    # spark.memory.fraction
+    manipulator.add_parameter(EnumParameter('spark_memory_storagefraction', ['0.2', '0.5', '0.8']))
     # spark.memory.offHeap.enabled
     manipulator.add_parameter(EnumParameter('spark_memory_offheap_enabled', ['true', 'false']))
     # spark.memory.offHeap.size
-    manipulator.add_parameter(EnumParameter('spark_memory_offheap_size', ['25m', '50m'])) # ## ## ## ## ## ## ## ## ## ## ## ## ## ## caution, den exei default, einai 0
+    manipulator.add_parameter(EnumParameter('spark_memory_offheap_size', ['25m', '50m']))
     # spark.storage.replication.proactive
     manipulator.add_parameter(EnumParameter('spark_storage_repllication_proactive', ['true', 'false']))
     # spark.cleaner.periodicGC.interval
@@ -257,38 +264,58 @@ class GccFlagsTuner(MeasurementInterface):
     manipulator.add_parameter(EnumParameter('spark_cleaner_referencetracking_blocking_shuffle', ['true', 'false']))
     # spark.cleaner.referenceTracking.cleanCheckpoints
     manipulator.add_parameter(EnumParameter('spark_cleaner_referencetracking_cleancheckpoints', ['true', 'false']))
+
+    """
+    ==================
+    EXECUTION BEHAVIOR
+    ==================
+    """
     # spark.broadcast.blockSize
-    manipulator.add_parameter(EnumParameter('spark_broadcast_blocksize', ['2m', '4m', '8m']))
+    manipulator.add_parameter(EnumParameter('spark_broadcast_blocksize', ['1m', '4m', '16m']))
     # spark.broadcast.checksum
+    manipulator.add_parameter(EnumParameter('spark_broadcast_checksum', ['true', 'false']))
     # spark.executor.cores
     manipulator.add_parameter(EnumParameter('spark_executor_cores', ['1', '4', '8', '16', '24', '48']))
     # spark.default.parallelism
+    manipulator.add_parameter(EnumParameter('spark_default_parallelism', ['1', '8', '24']))
     # spark.executor.heartbeatInterval
     manipulator.add_parameter(EnumParameter('spark_executor_heartbeatinterval', ['5s', '10s', '20s']))
     # spark.files.fetchTimeout
     manipulator.add_parameter(EnumParameter('spark_files_fetchtimeout', ['30s', '60s', '120s']))
     # spark.files.useFetchCache
-    # spark.files.overwrite
+    manipulator.add_parameter(EnumParameter('spark_files_usefetchcache', ['true', 'false']))
     # spark.files.maxPartitionBytes
     manipulator.add_parameter(EnumParameter('spark_files_maxpartitionbytes', ['70000000', '134217728', '190000000']))
     # spark.files.openCostInBytes
     manipulator.add_parameter(EnumParameter('spark_files_opencostinbytes', ['2000000', '4194304', '9000000']))
-    # spark.hadoop.cloneConf
-    # spark.hadoop.validateOutputSpecs
     # spark.storage.memoryMapThreshold
     manipulator.add_parameter(EnumParameter('spark_storage_memorymapthreshold', ['1m', '2m', '4m']))
+
+    # NOT USED
     # spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version
+    # spark.files.overwrite
+    # spark.hadoop.cloneConf
+    # spark.hadoop.validateOutputSpecs
+
+    """
+    ================
+    EXECUTOR METRICS
+    ================
+    """
+    # spark.executor.metrics.pollingInterval
+    manipulator.add_parameter(EnumParameter('spark_executor_metrics_pollinginterval', ['0', '2s', '5s']))
+
+    # NOT USED
     # spark.eventLog.logStageExecutorMetrics
     # spark.executor.processTreeMetrics.enabled
-    # spark.executor.metrics.pollingInterval
+
+    """
+    ==========
+    NETWORKING
+    ==========
+    """
     # spark.rpc.message.maxSize
-    manipulator.add_parameter(EnumParameter('spark_rpc_message_maxsize', ['64', '128', '256'])) # mallon xwris "m"
-    # spark.blockManager.port
-    # spark.driver.blockManager.port
-    # spark.driver.bindAddress
-    # spark.driver.host
-    # spark.driver.port
-    # spark.rpc.io.backLog
+    manipulator.add_parameter(EnumParameter('spark_rpc_message_maxsize', ['16', '128', '512'])) # mallon xwris "m"
     # spark.network.timeout
     manipulator.add_parameter(EnumParameter('spark_network_timeout', ['60s', '120s', '240s']))
     # spark.storage.blockManagerSlaveTimeoutMs
@@ -308,15 +335,27 @@ class GccFlagsTuner(MeasurementInterface):
     manipulator.add_parameter(EnumParameter('spark_rpc_lookuptimeout', ['60s', '120s', '240s']))
     # spark.core.connection.ack.wait.timeout
     manipulator.add_parameter(EnumParameter('spark_core_connection_ack_wait_timeout', ['60s', '120s', '240s']))
-    # spark.network.timeout
     # spark.network.maxRemoteBlockSizeFetchToMem
     manipulator.add_parameter(EnumParameter('spark_network_maxremoteblocksizefetchtomem', ['2147483147', '200000000']))
-    # spark.cores.max
-    # spark.deploy.defaultCores
+    # spark.blockManager.port
+
+    # NOT USED
+    # spark.driver.blockManager.port
+    # spark.driver.bindAddress
+    # spark.driver.host
+    # spark.driver.port
+    # spark.rpc.io.backLog
+    # spark.network.timeout
+
+    """
+    ==========
+    SCHEDULING
+    ==========
+    """
     # spark.locality.wait
-    manipulator.add_parameter(EnumParameter('spark_locality_wait', ['2s', '3s', '6s']))
+    manipulator.add_parameter(EnumParameter('spark_locality_wait', ['1s', '3s', '10s']))
     # spark.locality.wait.node
-    manipulator.add_parameter(EnumParameter('spark_locality_wait_node', ['2s', '3s', '6s']))
+    manipulator.add_parameter(EnumParameter('spark_locality_wait_node', ['1s', '3s', '10s']))
     # spark.locality.wait.process
     manipulator.add_parameter(EnumParameter('spark_locality_wait_process', ['2s', '3s', '6s']))
     # spark.locality.wait.rack
@@ -324,17 +363,23 @@ class GccFlagsTuner(MeasurementInterface):
     # spark.scheduler.maxRegisteredResourcesWaitingTime
     manipulator.add_parameter(EnumParameter('spark_scheduler_maxregisteredresourceswaitingtime', ['15s', '30s', '60s']))
     # spark.scheduler.minRegisteredResourcesRatio
+    manipulator.add_parameter(EnumParameter('spark_scheduler_minregisteredresourcesratio', ['15s', '30s', '60s']))
     # spark.scheduler.mode
     manipulator.add_parameter(EnumParameter('spark_scheduler_mode', ['FIFO', 'FAIR']))
     # spark.scheduler.revive.interval
     manipulator.add_parameter(EnumParameter('spark_scheduler_revive_interval', ['1s', '3s']))
     # spark.scheduler.listenerbus.eventqueue.capacity
-    manipulator.add_parameter(EnumParameter('spark_scheduler_listenerbus_eventqueue_capacity', ['5000', '10000', '20000']))
+    manipulator.add_parameter(EnumParameter('spark_scheduler_listenerbus_eventqueue_capacity', ['1000', '10000', '50000']))
     # spark.scheduler.listenerbus.eventqueue.shared.capacity
+    manipulator.add_parameter(EnumParameter('spark_scheduler_listenerbus_eventqueue_shared_capacity', ['1000', '10000', '50000']))
     # spark.scheduler.listenerbus.eventqueue.appStatus.capacity
+    manipulator.add_parameter(EnumParameter('spark_scheduler_listenerbus_eventqueue_appstatus_capacity', ['1000', '10000', '50000']))
     # spark.scheduler.listenerbus.eventqueue.executorManagement.capacity
+    manipulator.add_parameter(EnumParameter('spark_scheduler_listenerbus_eventqueue_executormanagement_capacity', ['1000', '10000', '50000']))
     # spark.scheduler.listenerbus.eventqueue.eventLog.capacity
+    manipulator.add_parameter(EnumParameter('spark_scheduler_listenerbus_eventqueue_eventlog_capacity', ['1000', '10000', '50000']))
     # spark.scheduler.listenerbus.eventqueue.streams.capacity
+    manipulator.add_parameter(EnumParameter('spark_scheduler_listenerbus_eventqueue_streams_capacity', ['1000', '10000', '50000']))
     # spark.scheduler.blacklist.unschedulableTaskSetTimeout
     manipulator.add_parameter(EnumParameter('spark_scheduler_blacklist_unschedulabletasksettimeout', ['60s', '120s', '240s']))
     # spark.blacklist.enabled
@@ -351,69 +396,85 @@ class GccFlagsTuner(MeasurementInterface):
     manipulator.add_parameter(IntegerParameter('spark_blacklist_stage_maxfailedexecutorspernode', 2, 4))
     # spark.blacklist.application.maxFailedTasksPerExecutor
     manipulator.add_parameter(IntegerParameter('spark_blacklist_application_maxfailedtasksperexecutor', 2, 4))
-    # spark.blacklist.timeout
     # spark.blacklist.application.maxFailedExecutorsPerNode
     manipulator.add_parameter(IntegerParameter('spark_blacklist_application_maxfailedexecutorspernode', 2, 4))
-    # spark.blacklist.timeout
     # spark.blacklist.killBlacklistedExecutors
     manipulator.add_parameter(EnumParameter('spark_blacklist_killblacklistedexecutors', ['true', 'false']))
-    # spark.blacklist.application.fetchFailure.enabled
     # spark.speculation
     manipulator.add_parameter(EnumParameter('spark_speculation', ['true', 'false']))
     # spark.speculation.interval
-    manipulator.add_parameter(EnumParameter('spark_speculation_interval', ['50ms', '100ms', '200ms']))
+    manipulator.add_parameter(EnumParameter('spark_speculation_interval', ['10ms', '100ms', '500ms']))
     # spark.speculation.multiplier
-    manipulator.add_parameter(EnumParameter('spark_speculation_multiplier', ['1.1', '1.5', '2']))
+    manipulator.add_parameter(EnumParameter('spark_speculation_multiplier', ['1.1', '1.5', '5']))
     # spark.speculation.quantile
     manipulator.add_parameter(EnumParameter('spark_speculation_quantile', ['0.5','0.75', '0.85']))
-    # spark.speculation.task.duration.threshold
     # spark.task.cpus
     manipulator.add_parameter(IntegerParameter('spark_task_cpus', 1, 3))
-    # spark.task.resource.{resourceName}.amount
-    # spark.executor.resource.{resourceName}.amount
     # spark.task.maxFailures
     manipulator.add_parameter(EnumParameter('spark_task_maxfailures', ['2', '4', '8']))
     # spark.task.reaper.enabled
     manipulator.add_parameter(EnumParameter('spark_task_reaper_enabled', ['true', 'false']))
-    # spark.task.reaper.*
     # spark.task.reaper.pollingInterval
     manipulator.add_parameter(EnumParameter('spark_task_reaper_pollinginterval', ['5s', '10s', '20s']))
     # spark.task.reaper.threadDump
     manipulator.add_parameter(EnumParameter('spark_task_reaper_threaddump', ['true', 'false']))
-    # spark.task.reaper.killTimeout
     # spark.stage.maxConsecutiveAttempts
     manipulator.add_parameter(EnumParameter('spark_stage_maxconsecutiveattempts', ['2', '4', '8']))
+
+    # NOT USED
+    # spark.speculation.task.duration.threshold
+    # spark.cores.max
+    # spark.blacklist.application.fetchFailure.enabled
+    # spark.task.resource.{resourceName}.amount
+    # spark.executor.resource.{resourceName}.amount
+    # spark.task.reaper.killTimeout
+
+    """
+    ======================
+    BARRIER EXECUTION MODE
+    ======================
+    """
     # spark.barrier.sync.timeout
     # spark.scheduler.barrier.maxConcurrentTasksCheck.interval
     # spark.scheduler.barrier.maxConcurrentTasksCheck.maxFailures
+    """
+    ==================
+    DYNAMIC ALLOCATION
+    ==================
+    """
     # spark.dynamicAllocation.enabled
-    # spark.shuffle.service.enabled
-    # spark.dynamicAllocation.shuffleTracking.enabled
-    # spark.dynamicAllocation.minExecutors
-    # spark.dynamicAllocation.maxExecutors
-    # spark.dynamicAllocation.initialExecutors
-    # spark.dynamicAllocation.executorAllocationRatio
+    manipulator.add_parameter(EnumParameter('spark_dynamicallocation_enabled', ['true', 'false']))
     # spark.dynamicAllocation.executorIdleTimeout
-    # spark.dynamicAllocation.cachedExecutorIdleTimeout
+    manipulator.add_parameter(EnumParameter('spark_dynamicallocation_executoridletimeout', ['10s', '60s', '240s']))
     # spark.dynamicAllocation.initialExecutors
+    manipulator.add_parameter(EnumParameter('spark_dynamicallocation_initialexecutors', ['0', '1', '4']))
     # spark.dynamicAllocation.minExecutors
-    # spark.dynamicAllocation.maxExecutors
-    # spark.dynamicAllocation.minExecutors
+    manipulator.add_parameter(EnumParameter('spark_dynamicallocation_minexecutors', ['0', '4', '10']))
     # spark.dynamicAllocation.executorAllocationRatio
-    # spark.dynamicAllocation.minExecutors
+    manipulator.add_parameter(EnumParameter('spark_dynamicallocation_executorallocationratio', ['0.1', '0.5', '1']))
+    # spark.dynamicAllocation.shuffleTracking.enabled
+    manipulator.add_parameter(EnumParameter('spark_dynamicallocation_shuffletracking_enabled', ['true', 'false']))
+
+    # NOT USED
+    # spark.dynamicAllocation.cachedExecutorIdleTimeout
     # spark.dynamicAllocation.maxExecutors
     # spark.dynamicAllocation.schedulerBacklogTimeout
     # spark.dynamicAllocation.sustainedSchedulerBacklogTimeout
-    # spark.dynamicAllocation.schedulerBacklogTimeout
-    # spark.dynamicAllocation.shuffleTracking.enabled
     # spark.dynamicAllocation.shuffleTracking.timeout
+
+    """
+    =====================
+    THREAD CONFIGURATIONS
+    =====================
+    """
     # spark.{driver|executor}.rpc.netty.dispatcher.numThreads
-    # spark.{driver|executor}.rpc.io.serverThreads
-    # spark.rpc.io.serverThreads
     # spark.{driver|executor}.rpc.io.clientThreads
-    # spark.rpc.io.clientThreads
     # spark.{driver|executor}.rpc.netty.dispatcher.numThreads
-    # spark.rpc.netty.dispatcher.numThreads
+    """
+    =========
+    SPARK SQL
+    =========
+    """
     # spark.sql.adaptive.advisoryPartitionSizeInBytes
     # spark.sql.adaptive.shuffle.targetPostShuffleInputSize
     # spark.sql.adaptive.coalescePartitions.enabled
@@ -541,6 +602,11 @@ class GccFlagsTuner(MeasurementInterface):
     # spark.sql.thriftserver.ui.retainedSessions
     # spark.sql.thriftserver.ui.retainedStatements
     # spark.sql.variable.substitute
+    """
+    ========================
+    STATIC SQL CONFIGURATION
+    ========================
+    """
     # spark.sql.event.truncate.length
     # spark.sql.extensions
     # spark.sql.hive.metastore.barrierPrefixes
@@ -557,6 +623,11 @@ class GccFlagsTuner(MeasurementInterface):
     # spark.sql.streaming.ui.retainedQueries
     # spark.sql.ui.retainedExecutions
     # spark.sql.warehouse.dir
+    """
+    ===============
+    SPARK STREAMING
+    ===============
+    """
     # spark.streaming.backpressure.enabled
     # spark.streaming.receiver.maxRate
     # spark.streaming.backpressure.initialRate
@@ -570,6 +641,11 @@ class GccFlagsTuner(MeasurementInterface):
     # spark.streaming.ui.retainedBatches
     # spark.streaming.driver.writeAheadLog.closeFileAfterWrite
     # spark.streaming.receiver.writeAheadLog.closeFileAfterWrite
+    """
+    ======
+    SPARKR
+    ======
+    """
     # spark.r.numRBackendThreads
     # spark.r.command
     # spark.r.driver.command
@@ -577,45 +653,33 @@ class GccFlagsTuner(MeasurementInterface):
     # spark.r.shell.command
     # spark.r.backendConnectionTimeout
     # spark.r.heartBeatInterval
+    """
+    ======
+    GRAPHX
+    ======
+    """
     # spark.graphx.pregel.checkpointInterval
+    """
+    ======
+    DEPLOY
+    ======
+    """
     # spark.deploy.recoveryMode
     # spark.deploy.zookeeper.url
     # spark.deploy.zookeeper.dir
-    # spark.pyspark.python
-    # spark.pyspark.driver.python
-    # spark.r.shell.command
-    # spark.driver.resource.{resourceName}.amount
 
-    #  
-    #  spark.driver.maxResultSize
-	
-#     manipulator.add_parameter(EnumParameter('default_parall', ['8', '25', '50']))# ## ## ##  caution -> default= num of cores in all machines
-#     manipulator.add_parameter(EnumParameter('shuffle_part', ['100', '200', '400']))
     return manipulator
 
   def run(self, desired_result, input, limit):
     """
     Compile and run a given configuration then
     return performance
-    """  
-    run_cmd = 'rm out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0 
-
+    """
     cfg = desired_result.configuration.data
 
-    if cfg['serializer'] != 'java':
-       run_cmd = 'echo "spark.serializer		"'
-       run_cmd += str(cfg['serializer'])
-       run_cmd += ' >> out'
-       run_result = self.call_program(run_cmd)
-       assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.reducer.maxSizeInFlight		"'
-    run_cmd += str(cfg['reducer_max_flight'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0 
+    # run_cmd = 'rm out'
+    # run_result = self.call_program(run_cmd)
+    # assert run_result['returncode'] == 0
 
     run_cmd = 'echo "spark.shuffle.file.buffer		"'
     run_cmd += str(cfg['spark_shuffle_file_buffer'])
@@ -780,7 +844,7 @@ class GccFlagsTuner(MeasurementInterface):
     assert run_result['returncode'] == 0
 
     run_cmd = 'echo "spark.memory.storageFraction		"'
-    run_cmd += str(cfg['memory_storage_fraction'])
+    run_cmd += str(cfg['spark_memory_storagefraction'])
     run_cmd += ' >> out'
     run_result = self.call_program(run_cmd)
     assert run_result['returncode'] == 0
@@ -855,78 +919,6 @@ class GccFlagsTuner(MeasurementInterface):
     run_result = self.call_program(run_cmd)
     assert run_result['returncode'] == 0
 
-    run_cmd = 'echo "spark.ui.dagGraph.retainedRootRDDs		"'
-    run_cmd += str(cfg['spark_ui_daggraph_retainedrootrdds'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.ui.enabled		"'
-    run_cmd += str(cfg['spark_ui_enabled'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.ui.liveUpdate.period		"'
-    run_cmd += str(cfg['spark_ui_liveupdate_period'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.ui.retainedJobs		"'
-    run_cmd += str(cfg['spark_ui_retainedjobs'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.ui.retainedStages		"'
-    run_cmd += str(cfg['spark_ui_retainedstages'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.ui.retainedTasks		"'
-    run_cmd += str(cfg['spark_ui_retainedtasks'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.worker.ui.retainedExecutors		"'
-    run_cmd += str(cfg['spark_worker_ui_retainedexecutors'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.worker.ui.retainedDrivers		"'
-    run_cmd += str(cfg['spark_worker_ui_retaineddrivers'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.sql.ui.retainedExecutions		"'
-    run_cmd += str(cfg['spark_sql_ui_retainedexecutions'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.streaming.ui.retainedBatches		"'
-    run_cmd += str(cfg['spark_streaming_ui_retainedbatches'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.ui.retainedDeadExecutors		"'
-    run_cmd += str(cfg['spark_ui_retaineddeadexecutors'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
-    run_cmd = 'echo "spark.ui.requestHeaderSize		"'
-    run_cmd += str(cfg['spark_ui_requestheadersize'])
-    run_cmd += ' >> out'
-    run_result = self.call_program(run_cmd)
-    assert run_result['returncode'] == 0
-
     run_cmd = 'echo "spark.kryo.unsafe		"'
     run_cmd += str(cfg['spark_kryo_unsafe'])
     run_cmd += ' >> out'
@@ -1139,7 +1131,7 @@ class GccFlagsTuner(MeasurementInterface):
 
     run_cmd = 'echo "spark.task.cpus		"'
     cpu = cfg['spark_task_cpus']
-    if cfg['spark_task_cpus'] > cfg['spark_executor_cores']:
+    if int(cfg['spark_task_cpus']) > int(cfg['spark_executor_cores']):
         cpu = cfg['spark_executor_cores']
     run_cmd += str(cpu)
     run_cmd += ' >> out'
@@ -1170,64 +1162,202 @@ class GccFlagsTuner(MeasurementInterface):
     run_result = self.call_program(run_cmd)
     assert run_result['returncode'] == 0
 
-    # run_cmd = 'echo "spark.yarn.am.memory		"'
-    # run_cmd += str(cfg['yarn_am_mem'])
-    # run_cmd += ' >> out'
-    # run_result = self.call_program(run_cmd)
-    # assert run_result['returncode'] == 0
+    """
+    NEW METRICS
+    """
+    run_cmd = 'echo "spark.executor.pyspark.memory		"'
+    run_cmd += str(cfg['spark_executor_pyspark_memory'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
 
-    # run_cmd = 'echo "spark.yarn.am.cores		"'
-    # run_cmd += str(cfg['yarn_am_cores'])
-    # run_cmd += ' >> out'
-    # run_result = self.call_program(run_cmd)
-    # assert run_result['returncode'] == 0
+    run_cmd = 'echo "spark.reducer.maxSizeInFlight    "'
+    run_cmd += str(cfg['spark_reducer_maxsizeinflight'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
 
-    # run_cmd = 'echo "spark.yarn.scheduler.heartbeat.interval-ms		"'
-    # run_cmd += str(cfg['yarn_heartbeat_interval'])
-    # run_cmd += ' >> out'
-    # run_result = self.call_program(run_cmd)
-    # assert run_result['returncode'] == 0
+    run_cmd = 'echo "spark.shuffle.service.enabled    "'
+    run_cmd += str(cfg['spark_shuffle_service_enabled'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
 
-    # run_cmd = 'echo "spark.yarn.scheduler.initial-allocation.interval		"'
-    # run_cmd += str(cfg['yarn_initial_interval'])
-    # run_cmd += ' >> out'
-    # run_result = self.call_program(run_cmd)
-    # assert run_result['returncode'] == 0
+    run_cmd = 'echo "spark.shuffle.service.index.cache.size    "'
+    run_cmd += str(cfg['spark_shuffle_service_index_cache_size'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
 
-    # run_cmd = 'echo "spark.yarn.containerLauncherMaxThreads		"'
-    # run_cmd += str(cfg['yarn_container_threads'])
-    # run_cmd += ' >> out'
-    # run_result = self.call_program(run_cmd)
-    # assert run_result['returncode'] == 0
+    run_cmd = 'echo "spark.checkpoint.compress    "'
+    run_cmd += str(cfg['spark_checkpoint_compress'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
 
-    # run_cmd = 'echo "spark.yarn.submit.waitAppCompletion		"'
-    # run_cmd += str(cfg['yarn_wait_completion'])
-    # run_cmd += ' >> out'
-    # run_result = self.call_program(run_cmd)
-    # assert run_result['returncode'] == 0
+    run_cmd = 'echo "spark.io.compression.zstd.level    "'
+    run_cmd += str(cfg['spark_io_compression_zstd_level'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
 
-    # run_cmd = 'echo "spark.yarn.blacklist.executor.launch.blacklisting.enabled		"'
-    # run_cmd += str(cfg['yarn_spark_blacklist_enabled'])
-    # run_cmd += ' >> out'
-    # run_result = self.call_program(run_cmd)
-    # assert run_result['returncode'] == 0
+    run_cmd = 'echo "spark.io.compression.zstd.buffersize    "'
+    run_cmd += str(cfg['spark_io_compression_zstd_buffersize'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
 
-    # run_cmd = 'echo "hibench.yarn.executor.num		"'
-    # num_exec = (48)/int(cfg['spark_executor_cores']) # pairnoun floor oi prakseis  ->FAT NODE
-    # # num_exec = num_exec -1
-    # run_cmd += str(num_exec)
-    # run_cmd += ' >> out'
-    # run_result = self.call_program(run_cmd)
-    # assert run_result['returncode'] == 0
+    run_cmd = 'echo "spark.kryo.referenceTracking    "'
+    run_cmd += str(cfg['spark_kryo_referencetracking'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.kryo.registrationrequired    "'
+    run_cmd += str(cfg['spark_kryo_registrationrequired'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.serializer    "'
+    run_cmd += str(cfg['spark_serializer'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.broadcast.checksum    "'
+    run_cmd += str(cfg['spark_broadcast_checksum'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.default.parallelism    "'
+    run_cmd += str(cfg['spark_default_parallelism'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.files.useFetchCache    "'
+    run_cmd += str(cfg['spark_files_usefetchcache'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.executor.metrics.pollingInterval   "'
+    run_cmd += str(cfg['spark_executor_metrics_pollinginterval'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.python.worker.memory   "'
+    run_cmd += str(cfg['spark_python_worker_memory'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.python.worker.reuse   "'
+    run_cmd += str(cfg['spark_python_worker_reuse'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.memory.storageFraction   "'
+    run_cmd += str(cfg['spark_memory_storagefraction'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.scheduler.minRegisteredResourcesRatio    "'
+    run_cmd += str(cfg['spark_scheduler_minregisteredresourcesratio'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+    
+
+    run_cmd = 'echo "spark.scheduler.listenerbus.eventqueue.shared.capacity	    "'
+    run_cmd += str(cfg['spark_scheduler_listenerbus_eventqueue_shared_capacity'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.scheduler.listenerbus.eventqueue.appStatus.capacity    "'
+    run_cmd += str(cfg['spark_scheduler_listenerbus_eventqueue_appstatus_capacity'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.scheduler.listenerbus.eventqueue.executorManagement.capacity	    "'
+    run_cmd += str(cfg['spark_scheduler_listenerbus_eventqueue_executormanagement_capacity'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.scheduler.listenerbus.eventqueue.streams.capacity	    "'
+    run_cmd += str(cfg['spark_scheduler_listenerbus_eventqueue_streams_capacity'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.scheduler.listenerbus.eventqueue.eventLog.capacity		    "'
+    run_cmd += str(cfg['spark_scheduler_listenerbus_eventqueue_eventlog_capacity'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.dynamicAllocation.enabled    "'
+    run_cmd += str(cfg['spark_dynamicallocation_enabled'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.executor.instances    "'
+    run_cmd += str(cfg['spark_executor_instances'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.dynamicAllocation.enabled    "'
+    run_cmd += str(cfg['spark_dynamicallocation_enabled'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.dynamicAllocation.executorIdleTimeout   "'
+    run_cmd += str(cfg['spark_dynamicallocation_executoridletimeout'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.dynamicAllocation.initialExecutors   "'
+    run_cmd += str(cfg['spark_dynamicallocation_initialexecutors'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.dynamicAllocation.minExecutors   "'
+    run_cmd += str(cfg['spark_dynamicallocation_minexecutors'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.dynamicAllocation.executorAllocationRatio   "'
+    run_cmd += str(cfg['spark_dynamicallocation_executorallocationratio'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
+
+    run_cmd = 'echo "spark.dynamicAllocation.shuffleTracking.enabled   "'
+    run_cmd += str(cfg['spark_dynamicallocation_shuffletracking_enabled'])
+    run_cmd += ' >> out'
+    run_result = self.call_program(run_cmd)
+    assert run_result['returncode'] == 0
 
-# 32 ews 120
     run_cmd = 'echo "spark.executor.memory		"'
-    exec_mem = float(cfg['spark_executor_memory'])/num_exec		# pairnoun floor oi prakseis    
-    exec_mem = exec_mem - 0.1 * exec_mem    # -> 10% memory overhead
-    exec_mem = exec_mem * 1000
-    exec_mem = int(exec_mem)
-    run_cmd += str(exec_mem)
-    run_cmd += 'm'
+    # exec_mem = float(cfg['spark_executor_memory'])/4		# pairnoun floor oi prakseis    
+    # exec_mem = exec_mem - 0.1 * exec_mem    # -> 10% memory overhead
+    # exec_mem = exec_mem * 1000
+    # exec_mem = int(exec_mem)
+    run_cmd += str(cfg['spark_executor_memory'])
     run_cmd += ' >> out'
     run_result = self.call_program(run_cmd)
     assert run_result['returncode'] == 0
@@ -1246,4 +1376,4 @@ class GccFlagsTuner(MeasurementInterface):
 
 if __name__ == '__main__':
   argparser = opentuner.default_argparser()
-  GccFlagsTuner.main(argparser.parse_args())
+  GccFlagsTuner.main(argparser.parse_args())