Spark connection and management

Architecture overview

Apache Spark is the recommended and default federation engine used by Querona.

The are two primary models of working with Spark:

  • hosting a local instance managed by Querona (configured by default during installation),

  • connecting to an existing cluster (e.g. Azure HDInsight).

The basic concept here is the Driver which is a Java application acting as a proxy for delegating incoming requests to the Spark execution engine.

The driver is bundled in a JAR file and then submitted to the cluster using spark-submit.

Once started, the driver starts an in-process Thrift/Http server on the given port and starts listening for incoming connections from Querona.

Incoming requests are authenticated using an API-key. Additionally, the driver opens the reverse connections to Querona via TDS, which is authenticated using a username/password pair configured on the Spark connection level.

The driver starts in a “NotInitialized” state which means it doesn’t try to connect to Spark until the first request is made. Most parameters are configurable on the connection level and will be sent over-the-wire during the first request for data. The actual details differ depending on the connection model used.

Local Spark instance (managed)

Introduction

In this model, Spark starts as a child (although independent) JVM process of Querona main process. This is achieved via programmatic invoke of spark-submit together with passing the driver JAR file as the primary argument. Other arguments, such as port number are passed as command line arguments as well. At this point, a new Java application is started, however, Spark itself is not initialized yet. For this to happen the first connection must be established so the Spark parameters can be passed along.

Connecting to Spark

For general information regarding connections see Create a connection.

Querona comes with a single pre-configured local Spark connection.

The important thing to note here is that Spark/Hive configurations are set on different levels. Except for standard environment parameters, a small bit of configuration is stored in config files (see: Advanced configuration) and most on the connection level. At runtime, the configurations are merged in this exact order, which means that if a given parameter is defined in a configuration file and then defined again on the connection level, the former one will be used.

When adding a new connection the following parameters apply:

Parameter

Description

Default value

Name

A friendly name of the connection e.g. Local Spark 2.1

Host:Port

A comma-separated list of server:port pairs to connect to, see: Cluster failover. For local instance use localhost:port.

localhost:8400

Protocol

The protocol that the driver will use to exchange data with Querona. Possible values: Thrift, http

Thrift

Spark dialect

(= protocol version). The dialect version used to communicate with the driver. Possible values: 1.6, 2.0

Driver API key

The security key used to authenticate requests incoming from Querona. Using the default value is highly NOT recommended.

querona-key

Username

The account to be used for authenticating reverse connections from the driver

Password

The password for the account above

Default database

The database used for storing data for cached views

Reverse connection host address

The address under which Querona can be reached by the driver. For local instance use localhost

Node connection timeout

The timeout used for waiting for each cluster node to respond. See: Cluster failover

10s

Cluster initialization timeout

The timeout given for the instance to initialize when doing the initial request

30s

Frame size

Frame size used when retrieving query results from Spark

16777216 bytes

Target partition size

See: Partitioning strategy

128000000 bytes

Max partitions for auto repartitioning

See: Partitioning strategy

256

Scheduler pools

To learn about Spark scheduler pools in general, please consult Spark Job Scheduling.

Scheduler pools describe relative part of cluster’s resources that will be assigned to process a given task. Querona’s Spark connections support 6 scheduler pools:

  • default - scheduler pool used in case no scheduler pool configuration is provided

  • ETL - scheduler pool used by cache load tasks, executes job in FIFO order

  • S, M, L, XL - 4 pools that user can specify for their queries with ascending allocation of resources

To specify pool to be used for given query use the OPTION clause. For example:

SELECT * FROM Table OPTION(POOL 'XL')]

or

INSERT INTO TABLE(COLUMN) select 1 OPTION(POOL 'S')

Exact configuration values for each scheduler pool can be reviewed and configured on Spark connection screen:

Spark scheduler pools

The meaning of the pool properties is a follows:

  • Mode: FIFO means tasks are queued one after the other. FAIR means that tasks can run in parallel and share resources.

  • Weight: Decides how much of cluster’s resources will this pool get relative to other pools.

  • Min share: Share of cluster’s resources assigned by FAIR scheduler (if possible) before allocation of rest of resources takes place.

You can review pools used by spark for each query in Spark’s admin portal, just go to tab ‘Stages’:

Spark stages

Spark variables

Any properties prefixed with spark can be defined in this section and will be used when creating a new Spark context. Several of them have been predefined by the Querona team with the most noticeable being:

Parameter

Description

Default value

spark.ui.port

The Spark UI port

4040

spark.local.dir

The local working directory of Spark

C:\ProgramData\Querona\tmp

Hive configuration

Under the hood, Spark uses Hive as a query execution engine. One of the important properties here is the warehouse directory i.e. a file system folder used as physical data storage.

Depending on version of Spark used, this parameter is set by either/or hive.metastore.warehouse.dir and spark.sql.warehouse.dir. These values are set in the config files (see: Advanced configuration) but can be passed on the connection level as well.

We highly recommend to always consult the documentation of Spark to make sure the right values are set. Excerpt from the Apache Spark v2.2 documentation:

Note

Note that the hive.metastore.warehouse.dir property in hive-site.xml is deprecated since Spark 2.0.0. Instead, use spark.sql.warehouse.dir to specify the default location of the database in a warehouse. You may need to grant write privilege to the user who starts the Spark application.

For the local instance the following parameters are also important:

Parameter

Description

Default value

javax.jdo.option.ConnectionURL

JDBC connection string to the Hive metastore

jdbc:sqlserver://localhost;databaseName=SparkMetastore;create=true;

javax.jdo.option.ConnectionUserName

JDBC login to the Hive metastore

javax.jdo.option.ConnectionPassword

JDBC password to the Hive metastore

hive.exec.scratchdir

C:/ProgramData/Querona/tmp/hive

Hive tmp folder

Connection test and Spark validation

Once configured the Test Connection button can be used to verify whether the instance is responsive. Note, that this only test if the given driver instance responds to incoming connections. To fully test the configuration, the following steps are one of the methods to trigger and check Spark initialization:

  • Using the Querona DATABASES menu create a new database named “spark-test-db” linked to the newly created connection

Create Spark db
  • On the newly created database add new view of type Custom SQL named “test_view”:

Create Spark view
  • On the next screen type select 1 as col; and click Parse

Create Spark view
  • Finish creating the view

  • Click the view on the list and go to Caching and Partitioning, click Edit

  • Set Caching mode to Persistent caching - one table and Save

Create Spark view
  • Go to Management tasks and run Rebuild persistent cache task

Create Spark view
  • Go to query editor and run the following query on the Spark database:

select [col]
from [spark-test-db].[dbo].[test_view]
union all
select [col]
from [spark-test-db].[dbo].[test_view]
  • The result should look more or less like this:

Create Spark view

These actions should force Spark to initialized. To verify it change the URL in the browser from e.g. http://querona-example:9000 to http://querona-example:4040 where 4040 is the Spark GUI port set in spark.ui.port variable (4040 is the default value).

Spark GUI should open and you should see the query executed:

Local Spark

You can click on the tabs to see different jobs and parameters which might be useful for future debugging and optimization.

Partitioning strategy

When JOIN queries are pushed-down to execute on Spark, the data from the original database has to be copied to Spark. Querona registers external data sources as JDBC data frames, optionally applying repartition(n) function invocation. If the data frame is too large it has to be broken into smaller chunks. The Target partition size (bytes) parameter defines the target size of these chunks. The default value 128 MiB is recommended.

The second parameter Max partitions for auto repartitioning (default: 256) defines the upper limit for the number of partitions to be created.

Local Instance Configuration

Local instance management options can be found under the Administer ‣ Local Spark Instances.

Main menu

Querona ships with a single pre-configured instance of Spark.

The Refresh button runs a state check to ensure the State column holds the latest information. The view has not auto-refreshing functionality so it might be necessary to use it a few times during state transition.

The Add button allows configuring a new local instance.

Clicking on a given row brings up the details blade with all parameters shown in read-only mode.

Details menu

Refresh button works the same way as in the previous screen.

Start, Stop and Kill buttons are used for controlling the instance state.

The difference between Stop and Kill is that the former sends a remote “self-shutdown” command allowing the instance to free all used resources and close itself softly. This process may take up to several minutes. Kill, on the other hand, sends an OS-level kill signal which should terminate the underlying JVM process (not recommended unless necessary).

You can run as many instances as you want at the same time as long as they use different ports.

Parameters

The following values can be configured for a given instance:

Parameter

Description

Default value

Name

A human-friendly name of the instance e.g. Spark 2.1

Port

The port that the driver will open to communicate with Querona.

8400

Protocol

The protocol that the driver will use to exchange data with Querona. Possible values: Thrift, http

Thrift

Spark dialect

(= protocol version). The dialect version used to communicate with the driver. Possible values: 1.6, 2.0

Driver API key

The security key used to authenticate incoming requests - this must match the key given when configuring the connection to Spark Using the default value is highly NOT recommended.

querona-key

Driver version

The version of the driver to be deployed to the instance - should match the used spark version.

2.1.0

SPARK_HOME

The root directory of the used Spark distribution.

%PROGRAMDATA%\Querona\lib\spark\spark-2.1.0-bin-hadoop2.6 |

SPARK_CONF_DIR

The directory where Spark configuration is being stored

%PROGRAMDATA%\Querona\conf\spark

Log directory

The directory where Spark log files are being generated. No spaces allowed. See also: Disable Log Tracing

%PROGRAMDATA%\Querona\logs

Spark master

Spark master URL

local[*]

Driver memory

Amount of memory to use for the driver process, i.e. where SparkContext is initialized.

8g

Executor memory

Amount of memory to use per executor process

8g

Total executor cores

The number of cores to use on each executor

4

_JAVA_OPTIONS

Value of _JAVA_OPTIONS variable picked up by Spark. Note: setting only “-Xms1G” might trigger the following startup error: “‘Initial’ is not recognized as an internal or external command”

Autostart

Sets whether the given instance should be started together with Querona.

False

Persistent

Sets whether the given instance should be left alive even the master Querona process is stopped.

False

Enable Log Tracing

Checking this flag will cause Querona to start tracing and copying Spark log files to Querona. It’s useful when using a logging engine such as SEQ to have all logs in one place.

False

Advanced configuration

Additionally Querona stores a few configuration files in C:ProgramDataQueronaconfspark (default).

File

Description

core-site.xml

Querona-specific HDFS configuration

hive-site.xml

Querona-specific Hive configuration

querona-site.xml

Querona configuration parameters

spark-env.cmd

Spark environment variables configuration

Standalone cluster

Introduction

For more resource consuming operations it makes sense to use a preconfigured standalone cluster. Once setup you should configure a new Local instance except for the Spark master parameter should be set to the proper URL e.g. spark://HOST:PORT.

HDInsight

Introduction

If you don’t have a running Spark cluster already in place then Microsoft HDInsight might be the solution to quickly get one running.

Querona fully supports HDInsight as the data federation engine.

HDInsight configuration

To add a new cluster, you may use the Azure Portal:

Azure menu

The following table summarizes the important parameters to be defined:

Parameter

Description

Cluster login username

Login that will be used to access Using Ambari

Cluster login password

Password that will be used to access Using Ambari

Secure Shell (SSH) username

See: Advanced management / troubleshooting

Cluster type

Use “Spark”

Version

Spark version - Querona supports Spark from 1.6.2 upwards

It is important to note that the communication between HDI and Querona is required both ways i.e. Querona must be able to reach the driver on the querona.driver.port value (default: 8400) and the driver must be able to reach Querona on the TDS port (default: 1200). To make it work, the HDI cluster should be placed a proper VLAN meeting these requirements.

Azure menu

Driver deployment

(further instructions assume that the HDInsight cluster is already configured and running).

The basic idea is similar to Spark Local instance, except the driver is deployed using Azure Portal on all (two) HDInsight head nodes (with failover enabled by default) and submitted with Spark master = master yarn which means that instead of running a new instance we are attaching to the running YARN process.

Building the tarball

Uploading the driver is done using the HDInsight Script Action mechanism. To do this the driver must be packed in a .tar.gz file and uploaded to an arbitrary publicly visible Azure blob.

The necessary tools and folder structure can be found in the %QUERONA_INSTALL_FOLDER%\HDInsight\version directory. We recommend copying this folder a local-user location to avoid problems with Windows elevated privileges.

The important step to be done at this point it configuring the primary driver parameters.

Open querona\querona-site.xml with any text editor. The following values are particularly important:

Parameter

Description

querona.api.key

must match the value entered later when creating a connection to the cluster; shouldn’t be left with the default value

querona.driver.port

The port the driver will listen on (default: 8400)

querona.driver.protocol

The connection protocol - possible values: thrift, http

querona.ha.enable

See: Cluster failover

Additionally a second file log4j.properties contains the Log4J configuration for the Querona driver, which can be tweaked as well. By default, it logs all data into *./logs* directory separately for each HDInsight head node with TRACE level. Contrary to the local instance, the logs are not streamed back to the master Querona instance in any way and thus the only way to access them is logging directly to the given node e.g. by SSH (see: Advanced management / troubleshooting).

Once done, the Build-Tarball.cmd scripts use 7Zip to the tarball and gzip all necessary files. The following line should be adjusted if necessary: set ZIP_EXE=”C:Program Files7-Zip7z.exe”

If everything went fine, a new file like querona21.tar.gz should appear. Now this file together with install-querona-driver-21.sh must be uploaded to a public Azure blob. You can either do it manually using a tool such as Microsoft Azure Storage Explorer or use the provided Build-Tarball-and-Upload.ps1 which runs the Build-Tarball.cmd and then uses the Azure Powershell to upload both files (installation required).

At this point you either have to use an existing or create a new Azure Storage Account using Azure Portal:

Azure menu

Once created, find the new resource to obtain the StorageAccountName and StorageAccountKey:

Azure menu

Edit the Build-Tarball-and-Upload.ps1 and copy-paste the values to the correct lines of the file. $ContainerName can be changed as well. Once run (using Windows Powershell ISE) a progress bar might appear and after several second the outcome should be more or less like the following:

Azure Powershell

The two highlighted URLs are the final locations of the files. You can copy-paste them in a web browser to make sure they are publicly available (they can be removed after the deployment process).

Running the installation script

On the HDI cluster management screen please find the Script actions button.

Azure menu

Then click Submit new

Script action

You can see the previous submissions as well - the Resubmit option makes it quite convenient to run the same script again in case you need to e.g. change the parameters.

The following table summarizes the inputs

Parameter

Description

Name

A user-friendly name of the script e.g. qdriver202-build-1445

Bash script URI

URL pointing the install-querona-driver-21.sh the script generated above e.g. https://test-server.blob.core.windows.net/test/install-querona-driver-21.sh

Head

Should the script run on head nodes? Check

Worker

Should the script run on worker nodes? Uncheck

Zookeeper

Should the script run on zookeeper nodes? Uncheck

Parameters

Script arguments - see below

Persist this script action

This only applies to worker nodes (Uncheck)

Script arguments should be passed as follows: - the URL of the .tar.gz generated above - spark-submit arguments passed in quotes

So the sample field of this value could be https://test-server.blob.core.windows.net/test/querona21.tar.gz “–driver-memory 4g –executor-memory 4g –executor-cores 1”

Clicking Create should submit the script for execution. Usually, it takes a few minutes for the script to execute. Failures usually occur faster.

Successful execution doesn’t always mean everything went fine.. Go through the next chapter to verify.

Using Ambari

One of the properties on the cluster Dashboard is the URL e.g. https://test-server.azurehdinsight.net. Clicking this URL opens Ambari - a graphical environment used for monitoring the cluster.

Apart from many different functions, you can also monitor the execution of submitted Script actions. They can be accessed during and after execution by clicking in the upper right menu:

Ambari

This brings up the Background operations window

Ambari

Drilling down will allow to access the execution logs for every requested cluster node (both head nodes in this case):

Ambari

If everything went fine the logs for every head node should look more or less like the following:

stderr:

--2017-04-25 09:08:37--  https://test-server.blob.core.windows.net/test/querona21.tar.gz
Resolving test-server.blob.core.windows.net (test-server.blob.core.windows.net)... 13.79.176.56
Connecting to test-server.blob.core.windows.net (test-server.blob.core.windows.net)|13.79.176.56|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 5602881 (5.3M) [application/octet-stream]
Saving to: /tmp/querona/querona21.tar.gz
0K .......... .......... .......... .......... ..........  0% 1.11M 5s
50K .......... .......... .......... .......... ..........  1%  144M 2s
100K .......... .......... .......... .......... ..........  2% 2.19M 2s
.
.
.
5300K .......... .......... .......... .......... .......... 97%  178M 0s
5350K .......... .......... .......... .......... .......... 98%  110M 0s
5400K .......... .......... .......... .......... .......... 99%  169M 0s
5450K .......... .......... .                               100%  250M=0.3s
2017-04-25 09:08:37 (17.2 MB/s) - /tmp/querona/querona21.tar.gz saved [5602881/5602881]
Starting |Product| install script on hn0-hdi201.pvj0tgv2cf4effqlnwp40uhvoa.ax.internal.cloudapp.net, OS VERSION: 16.04
Removing |Product| installation and tmp folder
Downloading |Product| tar file
Unzipping |Product|
querona/
querona/conf/
querona/init/
querona/init/ojdbc6.jar
querona/init/querona.conf
querona/log4j.properties
querona/querona-site.xml
querona/querona.spark.driver-1.0-spark-2.1.0.jar
querona/run-driver.sh
Making |Product| a service and starting
Using systemd configuration
('Start downloading script locally: ', u'https://test-server.blob.core.windows.net/test/install-querona-driver-21.sh')
Fromdos line ending conversion successful
Custom script executed successfully
Removing temp location of the script
Command completed successfully!

The last thing we need to note here is the head node IP addresses we will need them in a minute to connect to the driver. These can be found in the Hosts tab in Ambari:

Ambari

At this point, we can try to add a new Spark connection.

Connecting to HDInsight

The recommended way to start is to the use the predefined connection template called azure_spark_conn_template.

The following table summarizes the differences in connection parameter semantics compared to local instance connection:

Parameter

Description

Default value

Host:Port

CSV-list all local head-node IPs here e.g. 10.9.0.17:8400,10.9.0.18:8400

localhost:8400

Protocol

Must match the one defined in querona-site.xml

Thrift

Spark dialect

It depends on the used HDI version. 2.0 is recommended for new clusters.

Driver API key

Must match the one defined in querona-site.xml

querona-key

Reverse connection host address

An IP address of Querona instance that the driver will use for reverse connections. Must be reachable from both head nodes.

Node connection timeout

The timeout used for waiting for each cluster node to respond. See: Cluster failover

10s

Cluster initialization timeout

Since HDI cluster make take up to a few minutes to boot it’s recommended to increase the default value e.g. to 180s

30s

Scheduler pools

Same as Local Spark instance (managed).

Spark variables

Any properties regarding file system folders shouldn’t be set to avoid errors. spark.ui.port is not used - read below.

Hive configuration

Any properties regarding file system folders shouldn’t be set to avoid errors.

Connection test and Spark validation

Once configured, the same steps as for local instance can be done to ensure Spark was initialized. But since on HDInsight the driver is deployed a YARN client accessing the Spark GUI in done differently:

Under Ambari click Services ‣ Yarn ‣ Quick Links ‣ *any of the nodes* ‣ ResourceManager UI.

Ambari

This will open the YARN UI. Since there’s only one instance per cluster, it doesn’t matter which node you select.

The same UI is also available directly via URL like <https://test-server.azurehdinsight.net/yarnui/hn/cluster>.

If initialization was trigger our application should first appear on the SUBMITTED list and then move to RUNNING:

Ambari

Clicking the ID shows the details screen and from there Tracking URL: ApplicationMaster link leads to the Spark UI:

Ambari

Advanced management / troubleshooting

Using WinSCP to view the folder structure

All HDI nodes are Linux-based machines which enable easy management. As an example, we are using WinSCP. The following screenshot shows a sample connection configuration. The Host name field contains the IP address of the first head node.

WinSCP

Navigating to /usr/share/querona will take you to the installation folder:

WinSCP
  • the logs folder contains the driver log files,

  • querona-site.xml can be edited here as well, but service restart is required,

  • log4j.properties can be used to adjust the logging levels,

Note that we are on a single of two head nodes, so any action must be taken separately for each node. This applies to log files as well.

Using Putty to manage the service

Together with WinSCP, we recommend installing Putty. Then directly from WinSCP we can open a new Putty session and login to the node using CLI.

Starting from HDI 2.0 the driver is run a standard systemd deamon so we can use systemctl to manage the querona service.

e.g. running systemctl status querona should print something like:

querona.service - querona service
   Loaded: loaded (/etc/systemd/system/querona.service; enabled; vendor preset: enabled)
   Active: active (running) since Tue 2017-04-25 09:08:38 UTC; 3h 19min ago
Main PID: 37703 (bash)
   Tasks: 20
   Memory: 112.8M
      CPU: 12.974s
   CGroup: /system.slice/querona.service
           ├─37703 bash /usr/share/querona/run-driver.sh
           └─37708 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Dhdp.version=2.5.1.0-56 -cp /usr/hdp/current/hive-server2/lib/sqljdbc41.jar:/usr/share/querona

Apr 25 09:08:44 hn0-hdi201 run-driver.sh[37703]:   Starting conductor
Apr 25 09:08:44 hn0-hdi201 run-driver.sh[37703]:   17/04/25 09:08:44 INFO AppConductor: HA is enabled, attempting to start failover service and join leader ele
Apr 25 09:08:44 hn0-hdi201 run-driver.sh[37703]:   17/04/25 09:08:44 INFO AppConductor: Changing NodeState state to: WaitingLeaderResult
Apr 25 09:08:44 hn0-hdi201 run-driver.sh[37703]:   17/04/25 09:08:44 INFO FailoverService: Starting failover monitoring - path: /querona-leader-election time
Apr 25 09:08:44 hn0-hdi201 run-driver.sh[37703]:   17/04/25 09:08:44 INFO ActiveStandbyElector: Session connected.
Apr 25 09:08:44 hn0-hdi201 run-driver.sh[37703]:   17/04/25 09:08:44 INFO ActiveStandbyElector: Successfully created /querona-leader-election in ZK.
Apr 25 09:08:44 hn0-hdi201 run-driver.sh[37703]:   17/04/25 09:08:44 INFO FailoverService: Joining election, my data: 10.9.0.17:8400
Apr 25 09:08:44 hn0-hdi201 run-driver.sh[37703]:   17/04/25 09:08:44 INFO QueronaServer: starting thrift server on port 8400
Apr 25 09:08:44 hn0-hdi201 run-driver.sh[37703]:   17/04/25 09:08:44 INFO AppConductor: Releasing Spark
Apr 25 09:08:44 hn0-hdi201 run-driver.sh[37703]:   17/04/25 09:08:44 INFO AppConductor: Changing NodeState state to: Standby

sudo systemctl restart querona can be used in case of serious problems.

Cluster failover

Querona driver supports automatic failover using Apache Zookeeper.

To make it work the driver has to have at least two instances running and the Zookeeper service must be available. There are two parameters in the querona-site.xml regarding this:

Parameter

Description

querona.ha.enable

Enables/disables failover. Enabled by default in the HDInsight, disabled for local instances. Possible values: true, false.

ha.zookeeper.quorum

A CSV-list of a server:port values of Zookeeper servers. In HDInsight the value is already provided by the environment and is not required.

Once started, all driver instances will participate in a leader election and choose the active node. All other nodes will continue to listen for incoming connection but will transition to StandBy state.

Now when adding a new Spark connection, all servers should be entered in the Host:Port configuration field. Querona will try to find the active node using round-robin. Node connection timeout determines the wait time per each node. If an active node is not found within the Cluster initialization timeout the connection will fail.

If for any reason one of the nodes fails, a new election will start automatically. Any queries that were halted by the node failure, will be rerun automatically once a new active node is found.