Using Spark to Analyze Tabular Data

Learn to use Spark in JupyterLab to analyze UK Biobank tabular data.

Apache Spark is a modern, scalable framework for parallel processing of big data. To analyze tabular data using Spark in JupyterLab, you first need to launch JupyterLab in a Spark cluster configuration. For information on how to use HAIL with Jupyterlab, see example notebooks here.

Launching JupyterLab

  • In the platform select Tools, then JupyterLab.

  • Click New JupyterLab.

  • Type a descriptive title for your JupyterLab session in the Environment Name textbox.

  • Select a project where you want to run JupyterLab.

  • Click Spark Cluster under Cluster Configuration.

  • Select an instance type and number of nodes. This will affect how powerful the Spark cluster will be. The default settings allow for casual interrogation of the data. If you will be running complex queries or analyzing a large amount of data in memory, you may need to select a larger instance type. To increase parallelization efficiency and reduce processing time, you may need to select more nodes.

  • Click Start Environment. A new row will appear and the environment will begin initializing.

  • Once the the status becomes Ready, click the name to connect to JupyterLab.

  • Inside JupyterLab, select the DNAnexus menu, then select New Notebook.

  • Click the DNAnexus tab on the left and locate the new notebook (Untitled_<DATE>.ipynb).

  • Double-click the notebook name to open it.

  • Select Python 3 as the kernel. You are now ready to begin using the notebook.

Notebook Starting Code

To begin, import relevant Spark and DNAnexus libraries, and instantiate a Spark context and Spark session at the very top of your notebook, as shown below.

import pyspark
import dxpy
import dxdata
sc = pyspark.SparkContext()
spark = pyspark.sql.SparkSession(sc)

Ensure that your Spark session is only initialized once per JupyterLab session. If you try to evaluate this cell multiple times (for example, by selecting "Run All Cells" to rerun a notebook after it's already run, or by opening and running multiple notebooks in the same JupyterLab session), you may encounter errors or your notebook may hang. If that happens, you may need to restart the specific notebook's kernel.

As a best practice, shut down the kernel of any notebook you are not using, before running a second notebook in the same session.

To improve the reproducibility of your notebooks, and ensure they are portable across projects, it is better not to hardcode any database or dataset names. Instead, you can use the following code to automatically discover the database and dataset:

dispensed_database_name = dxpy.find_one_data_object(classname="database", name="app*", folder="/", name_mode="glob", describe=True)["describe"]["name"]
dispensed_dataset_id = dxpy.find_one_data_object(typename="Dataset", name="app*.dataset", folder="/", name_mode="glob")["id"]

Accessing the Database Directly Using SQL

  • To evaluate SQL, you can use the spark.sql("...") function, which returns a Spark DataFrame.

  • You can view the contents of a DataFrame (in full width) by calling .show(truncate=False) on it.

The following example lists the tables in the database:

spark.sql("USE " + dispensed_database_name)
spark.sql("SHOW TABLES").show(truncate=False)

Database Tables

The database contains the following tables:

Table name

Description

participant_0001, ..., participant_9999

These tables contain the main UK Biobank participant data. Each participant is represented as one row, and each data-field is represented as one or more columns. For scalability reasons, the data-fields are horizontally split across multiple tables, starting from table participant_0001 (which contains the first few hundred columns for all participants), followed by participant_0002 (which contains the next few hundred columns), etc. The exact number of tables depends on how many data-fields your application is approved for.

hesin

Hospitalization records. This table is only included if your application is approved for data-field #41259.

hesin_critical

Hospital critical care records. This table is only included if your application is approved for data-field #41290.

hesin_delivery

Hospital delivery records. This table is only included if your application is approved for data-field #41264.

hesin_diag

Hospital diagnosis records. This table is only included if your application is approved for data-field #41234.

hesin_maternity

Hospital maternity records. This table is only included if your application is approved for data-field #41261.

hesin_oper

Hospital operation records. This table is only included if your application is approved for data-field #41149.

hesin_psych

Hospital psychiatric records. This table is only included if your application is approved for data-field #41289.

death

Death records. This table is only included if your application is approved for data-field #40023.

death_cause

Death cause records. This table is only included if your application is approved for data-field #40023.

gp_clinical

GP clinical event records. This table is only included if your application is approved for data-field #42040.

gp_registrations

GP registration records. This table is only included if your application is approved for data-field #42038.

gp_scripts

GP prescription records. This table is only included if your application is approved for data-field #42039.

covid19_tpp_gp_clinical

GP clinical event records (COVID TPP). This table is only included if your application is approved for data-field #40101.

covid19_tpp_gp_scripts

GP prescription records (COVID TPP). This table is only included if your application is approved for data-field #40102.

covid19_emis_gp_clinical

GP clinical event records (COVID EMIS). This table is only included if your application is approved for data-field #40103.

covid19_emis_gp_scripts

GP prescription records (COVID EMIS). This table is only included if your application is approved for data-field #40104.

covid19_result_england

COVID19 Test Result Record (England). This table is only included if your application is approved for data-field #40100.

covid19_result_scotland

COVID19 Test Result Record (Scotland). This table is only included if your application is approved for data-field #40100.

covid19_result_wales

COVID19 Test Result Record (Wales). This table is only included if your application is approved for data-field #40100.

covid19_vaccination

COVID-19 vaccination data. This table is only included if your application is approved for data-field #32040.

olink_instance_0

Olink NPX values for the instance 0 visit. This table is only included if your application is approved for data-field #30900. For scalability reasons, the protein columns are horizontally split across multiple tables, starting from table olink_instance_0_0001 (which contains the first few hundred columns for all participants), followed by olink_instance_0_0002 (which contains the next few hundred columns), etc. The splitting applies to the olink_instance_2 and olink_instance_3 tables mentioned below as well.

olink_instance_2

Olink NPX values for the instance 2 visit. This table is only included if your application is approved for data-field #30900.

olink_instance_3

Olink NPX values for the instance 3 visit. This table is only included if your application is approved for data-field #30900.

omop_condition_era

OMOP Condition Era. This table is only included if your application is approved for data-field #20142.

omop_condition_occurrence

OMOP Condition Occurrence. This table is only included if your application is approved for data-field #20142.

omop_death

OMOP Death. This table is only included if your application is approved for data-field #20142.

omop_device_exposure

OMOP Device Exposure. This table is only included if your application is approved for data-field #20142.

omop_dose_era

OMOP Dose Era. This table is only included if your application is approved for data-field #20142.

omop_drug_era

OMOP Drug Era. This table is only included if your application is approved for data-field #20142.

omop_drug_exposure

OMOP Drug Exposure. This table is only included if your application is approved for data-field #20142.

omop_measurement

OMOP Measurement. This table is only included if your application is approved for data-field #20142.

omop_note

OMOP Note. This table is only included if your application is approved for data-field #20142.

omop_observation

OMOP Observation. This table is only included if your application is approved for data-field #20142.

omop_observation_period

OMOP Observation Period. This table is only included if your application is approved for data-field #20142.

omop_person

OMOP Person. This table is only included if your application is approved for data-field #20142.

omop_procedure_occurrence

OMOP Procedure Occurrence. This table is only included if your application is approved for data-field #20142.

omop_specimen

OMOP Specimen. This table is only included if your application is approved for data-field #20142.

omop_visit_detail

OMOP Visit Detail. This table is only included if your application is approved for data-field #20142.

omop_visit_occurrence

OMOP Visit Occurrence. This table is only included if your application is approved for data-field #20142.

When listing tables in SQL, you may notice each table appearing twice, using a regular name and a versioned name, such as"gp_clinical"and"gp_clinical_v4_0_9b7a7f3". This naming scheme is part of the system's architecture, supporting data refreshes and participant withdrawals.

The "regularly named" table is actually a SQL VIEW pointing to the versioned table. When data is updated, the VIEW is switched to point to a new versioned table, and the old versioned table is deleted. Due to this behavior, please make sure to always use the regularly named tables - such as "gp_clinical" - because the versioned tables do not persist over time.

If your access application has been approved for Data-field 23146, 23148, and/or 23157 you will also see the following tables:

Allele_23146, allele_23148, allele_23157, annotation_23146, annotation_23148, annotation_23157, assay_eid_map_23146, assay_eid_map_23148, assay_eid_map_23157, genotype_23146, genotype_23148, genotype_23157, pheno_assay_23146_link, rsid_lookup_r81_23146, pheno_assay_23146_link, rsid_lookup_r81_23148, pheno_assay_23157_link, and rsid_lookup_r81_23157.

These tables contain limited information about alleles and genotypes, transcribed into SQL from the pVCF files of Data-field 23146 and/or 23148 and/or 23157 (along with added annotations). These tables are used by the Cohort Browser in the creation of the "GENOMICS" tab. They have not been optimized for direct SQL querying, and their schema and conventions are subject to change. For this reason, it is not recommended to access these tables on your own but to access the bulk files instead.

Database Columns

For the main UK Biobank participant tables, the column-naming convention is generally as follows:

p<FIELD-ID>_i<INSTANCE-ID>_a<ARRAY-ID>

However, the following additional rules apply:

  • If a field is not instanced, the _i<INSTANCE-ID> piece is skipped altogether.

  • If a field is not arrayed, the _a<ARRAY-ID> piece is skipped altogether.

  • If a field is arrayed due to being multi-select, the field is converted into a single column of type "embedded array", and the _a<ARRAY-ID> piece is skipped altogether.

Examples:

  • Age at recruitment: p21022

  • Date of attending assessment centre: p53_i0, p53_i1, ...

  • Diagnoses - ICD10 (converted into embedded array): p41270

For all other tables - such as hospital records, GP records, death records, and COVID-19 records - the column names are identical to what UK Biobank provides in its Showcase. For more information on the columns of these tables, consult Resource #138483 (hospital records), Resource #591 (GP records), Resource #115559 (death records), Resource #3151 (COVID-19 GP records), or Resource #1758 (COVID-19 test results).

Tips for Using SQL

The main participant data is horizontally split into multiple tables, and you may find that SQL is less than suitable for querying those tables directly. To access main participant data, consider using the dataset construct as discussed below.

For linked health care tables, it is easier to use SQL directly to extract data as a Spark DataFrame. The following example retrieves all GP records related to serum HDL cholesterol levels.

# Assumes you have already done: spark.sql("USE " + dispensed_database_name)
df = spark.sql("SELECT * FROM gp_clinical WHERE read_2 = '44P5.00' OR read_3 = '44P5.'")

Spark DataFrames are lazy-evaluated. In the code block above, the command will return right away, assigning the variable df without executing the query. The query is only evaluated when needed, potentially with additional transformations.

For example, typing df.count() later will evaluate an equivalent SELECT COUNT(*)...

For a list of all DataFrame functions, consult the PySpark DataFrame Documentation.

Accessing the Dataset Using Python

For an example Jupyter notebook that demonstrates how to extract data, see here.

As mentioned above, the dataset combines the low-level database structure with metadata from the UK Biobank Showcase. Database tables are exposed as virtual entities, and database columns are exposed as fields. The split participant tables are all combined into a single entity called participant.

After initializing variables in your notebook, you can load the dataset and access the participant entity as follows:

dataset = dxdata.load_dataset(id=dispensed_dataset_id)
participant = dataset["participant"]

To fetch participant fields, you must first make a list of field names of interest. There are three main ways to look up field names:

  • If you already know the UK Biobank data-field id, or if you navigate to the UK Biobank Showcase and browse or search for data-fields, you can construct the field name using the rules mentioned above. For example, data-field 21002 (participant weight) corresponds to field names p21002_i0 through p21002_i3.

  • You can look up field names in the Cohort Browser. The following screenshot shows an example of searching for the "weight" keyword and locating the name of a field ( p21002_i0, shown next to the Link label).

  • You can look up fields programmatically, by iterating over all fields in the participant.fields array, or by using the function participant.find_fields. Refer to the dxdata documentation for more information. The following example finds all fields with a matching case-insensitive keyword "weight" in their titles:

weight_fields = list(participant.find_fields(lambda f: "weight" in f.title.lower()))
weight_field_names = [f.name for f in weight_fields]

Once you have gathered field names of interest, you can create a Spark DataFrame of corresponding participant data using the retrieve_fields function:

field_names = ["eid", "p31", "p21002_i0"]
df = participant.retrieve_fields(names=field_names, engine=dxdata.connect())

This function automatically joins across the split participant tables as needed, returning the requested columns as a Spark DataFrame.

Tips for Retrieving Fields

  • You can continue to work with the Spark DataFrame and leverage Spark functions for counting, filtering, aggregations, or statistics. Consult the PySpark DataFrame Documentation for more information. Spark functions are executed in a distributed manner across the Spark cluster.

    • If a particular Spark command is taking too long to evaluate, you can monitor the Spark status by visiting the Spark console page. To do that, copy the URL of your current JupyterLab session (typically ending in ".dnanexus.cloud/lab?"), open a new browser tab, and paste the URL. Replace "/lab?" with ":8081/jobs/"and press Enter.

    • If you prefer to load all the results in memory, instead of keeping them in a parallelized and decentralized Spark DataFrame, simply convert the Spark DataFrame to a Pandas DataFrame by calling .toPandas(). This will return a Pandas DataFrame in memory, which you can manipulate further using other pandas functions. Pandas functionality runs in the same VM as JupyterLab and does not leverage the Spark cluster.

  • It is a good idea to always include the participant EID ("eid") as the first field name, so that it is returned as the first column. If you don't include it, the system will not return the value.

  • By default, the system returns the data as encoded by UK Biobank. For example, field p31(participant sex) will be returned as an integer column with values of 0 and 1. To receive decoded values, supply the coding_values="replace" argument.

  • The returned DataFrame uses the field names as the column titles. If you prefer to give them some human-readable names, you can provide a mapping from field names to your own names using the column_aliases={"p21002_i0": "weight", ...}argument.

Accessing Cohorts

To retrieve the EIDs of a cohort that you previously saved via the Cohort Browser - in this example, a cohort named "controls" in the root folder - use the following code:

cohort = dxdata.load_cohort(folder="/", name="controls")
cohort_eids_df = spark.sql(cohort.sql)

To retrieve participant fields for a cohort, supply the filter_sql=cohort.sql argument:

# Retrieve EID and age for each cohort participant
df = participant.retrieve_fields(names=["eid", "p21022"], filter_sql=cohort.sql, engine=dxdata.connect())

Last updated