Using Apache Spark for Data Processing and Machine Learning to detect nature of physical activity using sensor data.
The PAMAP2 Physical Activity Monitoring dataset contains data of 18 different physical activities, performed by 9 subjects wearing 3 inertial measurement units and a heart rate monitor. This classification task aims to distinguish activities of minimum,low,medium and high effort based on various physical activities sensor measurements given in the dataset.Check the raw data formatting in the below cell for more details.
We will load the data into Hadoop ditributed file system and utilize pyspark to process the data and use mlib library in pyspark for machine learning algorithms to detect the physical activity based on readings.
Raw data format
Synchronized and labeled raw data from all the sensors (3 IMUs and the HR-monitor) is merged
into 1 data file per subject per session (protocol or optional), available as text-files (.dat).
Each of the data-files contains 54 columns per row, the columns contain the following data:
– 1 timestamp (s)
– 2 activityID (see II.2. for the mapping to the activities)
– 3 heart rate (bpm)
– 4-20 IMU hand
– 21-37 IMU chest
– 38-54 IMU ankle
The IMU sensory data contains the following columns:
– 1 temperature (°C)
– 2-4 3D-acceleration data (ms-2), scale: ±16g, resolution: 13-bit
– 5-7 3D-acceleration data (ms-2), scale: ±6g, resolution: 13-bit*
– 8-10 3D-gyroscope data (rad/s)
– 11-13 3D-magnetometer data (μT)
– 14-17 orientation (invalid in this data collection)
- Activity IDs: – 1 lying – 2 sitting – 3 standing – 4 walking – 5 running – 6 cycling – 7 Nordic walking – 9 watching TV – 10 computer work – 11 car driving – 12 ascending stairs – 13 descending stairs – 16 vacuum cleaning – 17 ironing – 18 folding laundry – 19 house cleaning – 20 playing soccer – 24 rope jumping – 0 other (transient activities) Note: data labeled with activityID=0 should be discarded in any kind of analysis. This data mainly covers transient activities between performing different activities, e.g. going from one location to the next activity’s location, or waiting for the preparation of some equipment. Also, different parts of one subject’s recording (in the case when the data collection was aborted for some reason) was put together during these transient activities (noticeable by some “jumping” in the HR-data).
For the classification task we have binned the activity list id’s given above based on the intensity of each physical activitie and this would be used as our target variable for classification:
Minimun: [lying,sitting,standing,computer work,car driving,folding laundry] minimum_activity = [1,2,3,9,10,11,18] Low: [descending stairs, ironing, house cleaning] low_activity = [13,17,19] medium: [walking,descending stairs,vacuum cleaning] medium_activity = [4,12,16] high: [running,cycling,nordic walking,playing soccer,rope jumping] high_activity = [5,6,7,20,24]
Move Rawdata to HDFS and Convert into ORC format
Follow the below steps to download and move raw data from local file system to hdfs and convert into orc format for distributed processing
Download the dataset
[hadoop@test-ser-vm01 pamap2]$ wget http://archive.ics.uci.edu/ml/machine-learning-databases/00231/PAMAP2_Dataset.zip
[hadoop@test-ser-vm01 pamap2]$ unzip PAMAP2_Dataset.zip
[hadoop@test-ser-vm01 pamap2]$ cd PAMAP2_Dataset
[hadoop@test-ser-vm01 pamap2]$ cd PAMAP2_Dataset/Protocol
[hadoop@test-ser-vm01 Protocol]$ ls -lrt
total 1271440
-rw-------. 1 hadoop hadoop 141698539 Jan 10 2012 subject101.dat
-rw-------. 1 hadoop hadoop 207349310 Jan 10 2012 subject102.dat
-rw-------. 1 hadoop hadoop 117863364 Jan 10 2012 subject103.dat
-rw-------. 1 hadoop hadoop 153227482 Jan 10 2012 subject104.dat
-rw-------. 1 hadoop hadoop 173773599 Jan 10 2012 subject105.dat
-rw-------. 1 hadoop hadoop 168263273 Jan 10 2012 subject106.dat
-rw-------. 1 hadoop hadoop 145823268 Jan 10 2012 subject107.dat
-rw-------. 1 hadoop hadoop 190019653 Jan 10 2012 subject108.dat
-rw-------. 1 hadoop hadoop 3891140 Jan 10 2012 subject109.dat
Create directory in HDFS and copy files from local to HDFS
[hadoop@test-ser-vm01 Protocol]$ hadoop fs -mkdir /mnt/data/scripts/notebooks/ashwin/data
[hadoop@test-ser-vm01 Protocol]$ hadoop dfs -put subject* /mnt/data/scripts/notebooks/ashwin/data
[hadoop@test-ser-vm01 Protocol]$ hadoop fs -ls /mnt/data/scripts/notebooks/ashwin/data
Found 9 items
-rw-r--r-- 3 hadoop supergroup 141698539 2018-03-14 08:41 /mnt/data/scripts/notebooks/ashwin/data/subject101.dat
-rw-r--r-- 3 hadoop supergroup 207349310 2018-03-14 08:41 /mnt/data/scripts/notebooks/ashwin/data/subject102.dat
-rw-r--r-- 3 hadoop supergroup 117863364 2018-03-14 08:41 /mnt/data/scripts/notebooks/ashwin/data/subject103.dat
-rw-r--r-- 3 hadoop supergroup 153227482 2018-03-14 08:41 /mnt/data/scripts/notebooks/ashwin/data/subject104.dat
-rw-r--r-- 3 hadoop supergroup 173773599 2018-03-14 08:41 /mnt/data/scripts/notebooks/ashwin/data/subject105.dat
-rw-r--r-- 3 hadoop supergroup 168263273 2018-03-14 08:41 /mnt/data/scripts/notebooks/ashwin/data/subject106.dat
-rw-r--r-- 3 hadoop supergroup 145823268 2018-03-14 08:41 /mnt/data/scripts/notebooks/ashwin/data/subject107.dat
-rw-r--r-- 3 hadoop supergroup 190019653 2018-03-14 08:41 /mnt/data/scripts/notebooks/ashwin/data/subject108.dat
-rw-r--r-- 3 hadoop supergroup 3891140 2018-03-14 08:41 /mnt/data/scripts/notebooks/ashwin/data/subject109.dat
Create HIVE external Table with schema and Load data
hive> CREATE SCHEMA IF NOT EXISTS test1;
OK.
hive>CREATE EXTERNAL TABLE IF NOT EXISTS test1.pamap2_table
(time DOUBLE, activity DOUBLE, heart_rate DOUBLE,hand_temp DOUBLE,hand_acc16g_1 DOUBLE, hand_acc16g_2 DOUBLE, hand_acc16g_3 DOUBLE, hand_acc6g_1 DOUBLE, hand_acc6g_2 DOUBLE, hand_acc6g_3 DOUBLE, hand_gyro_1 DOUBLE,hand_gyro_2 DOUBLE,hand_gyro_3 DOUBLE, hand_mag_1 DOUBLE, hand_mag_2 DOUBLE, hand_mag_3 DOUBLE, hand_orient_1 DOUBLE, hand_orient_2 DOUBLE, hand_orient_3 DOUBLE, hand_orient_4 DOUBLE,
chest_temp DOUBLE,chest_acc16g_1 DOUBLE, chest_acc16g_2 DOUBLE, chest_acc16g_3 DOUBLE, chest_acc6g_1 DOUBLE, chest_acc6g_2 DOUBLE, chest_acc6g_3 DOUBLE, chest_gyro_1 DOUBLE,chest_gyro_2 DOUBLE,chest_gyro_3 DOUBLE, chest_mag_1 DOUBLE, chest_mag_2 DOUBLE, chest_mag_3 DOUBLE, chest_orient_1 DOUBLE, chest_orient_2 DOUBLE, chest_orient_3 DOUBLE, chest_orient_4 DOUBLE,
ankle_temp DOUBLE,ankle_acc16g_1 DOUBLE, ankle_acc16g_2 DOUBLE, ankle_acc16g_3 DOUBLE, ankle_acc6g_1 DOUBLE, ankle_acc6g_2 DOUBLE, ankle_acc6g_3 DOUBLE, ankle_gyro_1 DOUBLE,ankle_gyro_2 DOUBLE,ankle_gyro_3 DOUBLE, ankle_mag_1 DOUBLE, ankle_mag_2 DOUBLE, ankle_mag_3 DOUBLE, ankle_orient_1 DOUBLE, ankle_orient_2 DOUBLE, ankle_orient_3 DOUBLE, ankle_orient_4 DOUBLE )
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ' '
STORED AS TEXTFILE
LOCATION '/mnt/data/scripts/notebooks/ashwin/data/';
OK.
The import can be verified by listing the first few rows in the table:
hive> select * from test1.pamap2_table limit 2;
OK
8.38 0.0 104.0 30.0 2.37223 8.60074 3.51048 2.43954 8.76165 3.35465 -0.0922174 0.0568115 -0.0158445 14.6806 -69.2128 -5.58905 1.0 0.0 0.0 0.0 31.8125 0.23808 9.80003 -1.68896 0.265304 9.81549 -1.41344 -0.00506495 -0.00678097 -0.00566295 0.47196-51.0499 43.2903 1.0 0.0 0.0 0.0 30.3125 9.65918 -1.65569 -0.0997967 9.64689 -1.55576 0.310404 0.00830026 0.00925038 -0.0175803 -61.1888 -38.9599 -58.1438 1.0 0.0 0.0 0.0
8.39 0.0 NaN 30.0 2.18837 8.5656 3.66179 2.39494 8.55081 3.64207 -0.0244132 0.0477585 0.00647434 14.8991 -69.2224 -5.82311 1.0 0.0 0.0 0.0 31.8125 0.31953 9.61282 -1.49328 0.234939 9.78539 -1.42846 0.013685 0.00148646 -0.0415218 1.0169 -50.3966 43.1768 1.0 0.0 0.0 0.0 30.3125 9.6937 -1.57902 -0.215687 9.6167 -1.6163 0.280488 -0.00657665 -0.00463778 3.6825E-4 -59.8479 -38.8919 -58.5253 1.0 0.0 0.0 0.0
Time taken: 0.191 seconds, Fetched: 2 row(s)
Create a HIVE managed ORC table to convert into orc format
hive> CREATE TABLE test1.pamap2_orc
(time DOUBLE, activity DOUBLE, heart_rate DOUBLE,hand_temp DOUBLE,hand_acc16g_1 DOUBLE, hand_acc16g_2 DOUBLE, hand_acc16g_3 DOUBLE, hand_acc6g_1 DOUBLE, hand_acc6g_2 DOUBLE, hand_acc6g_3 DOUBLE, hand_gyro_1 DOUBLE,hand_gyro_2 DOUBLE,hand_gyro_3 DOUBLE, hand_mag_1 DOUBLE, hand_mag_2 DOUBLE, hand_mag_3 DOUBLE, hand_orient_1 DOUBLE, hand_orient_2 DOUBLE, hand_orient_3 DOUBLE, hand_orient_4 DOUBLE,
chest_temp DOUBLE,chest_acc16g_1 DOUBLE, chest_acc16g_2 DOUBLE, chest_acc16g_3 DOUBLE, chest_acc6g_1 DOUBLE, chest_acc6g_2 DOUBLE, chest_acc6g_3 DOUBLE, chest_gyro_1 DOUBLE,chest_gyro_2 DOUBLE,chest_gyro_3 DOUBLE, chest_mag_1 DOUBLE, chest_mag_2 DOUBLE, chest_mag_3 DOUBLE, chest_orient_1 DOUBLE, chest_orient_2 DOUBLE, chest_orient_3 DOUBLE, chest_orient_4 DOUBLE,
ankle_temp DOUBLE,ankle_acc16g_1 DOUBLE, ankle_acc16g_2 DOUBLE, ankle_acc16g_3 DOUBLE, ankle_acc6g_1 DOUBLE, ankle_acc6g_2 DOUBLE, ankle_acc6g_3 DOUBLE, ankle_gyro_1 DOUBLE,ankle_gyro_2 DOUBLE,ankle_gyro_3 DOUBLE, ankle_mag_1 DOUBLE, ankle_mag_2 DOUBLE, ankle_mag_3 DOUBLE, ankle_orient_1 DOUBLE, ankle_orient_2 DOUBLE, ankle_orient_3 DOUBLE, ankle_orient_4 DOUBLE )
STORED AS ORC;
OK
As we have already loaded temporary table pampa2_table, it’s time to load the data from it to actual ORC table pampa2_orc.
Insert the data from the external table into the Hive-managed table.
hive> INSERT INTO TABLE test1.pamap2_orc SELECT * FROM test1.pamap2_table;
Query ID = hadoop_20180314092522_71507bf7-1c30-4376-87cd-b38129a7f6a9
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks is set to 0 since there's no reduce operator
Starting Job = job_1552283641705_0251, Tracking URL = http://test-ser-vm01:8088/proxy/application_1552283641705_0251/
Kill Command = /mnt/data/hadoop/hadoop/bin/hadoop job -kill job_1552283641705_0251
Hadoop job information for Stage-1: number of mappers: 9; number of reducers: 0
2018-03-14 09:25:56,207 Stage-1 map = 0%, reduce = 0%
2018-03-14 09:26:05,033 Stage-1 map = 11%, reduce = 0%, Cumulative CPU 6.13 sec
2018-03-14 09:26:13,402 Stage-1 map = 22%, reduce = 0%, Cumulative CPU 11.18 sec
2018-03-14 09:26:14,445 Stage-1 map = 33%, reduce = 0%, Cumulative CPU 19.27 sec
2018-03-14 09:26:22,804 Stage-1 map = 66%, reduce = 0%, Cumulative CPU 84.16 sec
2018-03-14 09:26:23,846 Stage-1 map = 80%, reduce = 0%, Cumulative CPU 100.77 sec
2018-03-14 09:26:24,889 Stage-1 map = 88%, reduce = 0%, Cumulative CPU 119.36 sec
2018-03-14 09:26:30,099 Stage-1 map = 90%, reduce = 0%, Cumulative CPU 132.35 sec
2018-03-14 09:26:32,183 Stage-1 map = 96%, reduce = 0%, Cumulative CPU 135.55 sec
2018-03-14 09:26:38,455 Stage-1 map = 100%, reduce = 0%, Cumulative CPU 143.93 sec
MapReduce Total cumulative CPU time: 2 minutes 23 seconds 930 msec
Ended Job = job_1552283641705_0251
Stage-4 is selected by condition resolver.
Stage-3 is filtered out by condition resolver.
Stage-5 is filtered out by condition resolver.
Moving data to directory hdfs://test-ser-vm01:9000/user/hive/warehouse/test1.db/pamap2_orc/.hive-staging_hive_2018-03-14_09-25-22_801_2601980388252549428-1/-ext-10000
Loading data to table test1.pamap2_orc
MapReduce Jobs Launched:
Stage-Stage-1: Map: 9 Cumulative CPU: 143.93 sec HDFS Read: 1302077635 HDFS Write: 772976727 SUCCESS
Total MapReduce CPU Time Spent: 2 minutes 23 seconds 930 msec
OK
Time taken: 78.615 seconds
Verify the data import into ORC-formatted table:
hive> select * from test1.pamap2_orc limit 2;
OK
5.64 0.0 NaN 33.0 2.79143 7.55389 -7.06374 2.87553 7.88823 -6.76139 1.0164 -0.28941 1.38207 -11.6508 -3.73683 31.17841.0 0.0 0.0 0.0 36.125 1.94739 9.59644 -3.12873 1.81868 9.49711 -2.91989 0.124025 0.112482 -0.0449469 -20.2905 -32.0492 8.67906 1.0 0.0 0.0 0.0 33.8125 9.84408 -0.808951 -1.64674 9.73055 -0.846832 -1.29665 -0.027148 -0.0311901 -0.0408973 -47.7695 -2.58701 59.8481 -0.0128709 0.747947 -0.0798406 0.658813
5.65 0.0 NaN 33.0 2.86086 7.43814 -7.21626 2.84248 7.63164 -6.8514 1.08269 -0.393965 1.60935 -11.6575 -3.18648 30.7215 1.0 0.0 0.0 0.0 36.125 1.7512 9.6334 -3.32601 1.74445 9.69355 -2.96421 0.132679 0.0608292 -0.0441676 -20.6409 -31.69898.30648 1.0 0.0 0.0 0.0 33.8125 9.83968 -0.807666 -1.80115 9.73049 -0.816601 -1.31189 0.0128035 -0.0363842 -0.0148455 -47.7624 -2.81438 60.3407 0.0140248 -0.74841 0.0790426 -0.65836
Time taken: 0.158 seconds, Fetched: 2 row(s)
Successfuly converted into ORC format
hive> describe formatted test1.pamap2_orc;
OK
# col_name data_type comment
time double
activity double
heart_rate double
hand_temp double
hand_acc16g_1 double
hand_acc16g_2 double
. .
. .
. .
ankle_orient_3 double
ankle_orient_4 double
# Detailed Table Information
Database: test1
Owner: hadoop
CreateTime: Thu Mar 14 09:24:11 UTC 2018
LastAccessTime: UNKNOWN
Retention: 0
Location: hdfs://test-ser-vm01:9000/user/hive/warehouse/test1.db/pamap2_orc
Table Type: MANAGED_TABLE
Table Parameters:
COLUMN_STATS_ACCURATE {\"BASIC_STATS\":\"true\"}
numFiles 9
numRows 2872533
rawDataSize 1240934256
totalSize 772975987
transient_lastDdlTime 1552555601
# Storage Information
SerDe Library: org.apache.hadoop.hive.ql.io.orc.OrcSerde
InputFormat: org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
OutputFormat: org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
Compressed: No
Num Buckets: -1
Bucket Columns: []
Sort Columns: []
Storage Desc Params:
serialization.format 1
Time taken: 0.604 seconds, Fetched: 83 row(s)
[hadoop@test-ser-vm01 Protocol]$ hadoop fs -ls /user/hive/warehouse/test1.db/pamap2_orc
Found 9 items
-rwxr-xr-x 3 hadoop supergroup 202673523 2018-03-14 09:26 /user/hive/warehouse/test1.db/pamap2_orc/000000_0
-rwxr-xr-x 3 hadoop supergroup 153111161 2018-03-14 09:26 /user/hive/warehouse/test1.db/pamap2_orc/000001_0
-rwxr-xr-x 3 hadoop supergroup 104919944 2018-03-14 09:26 /user/hive/warehouse/test1.db/pamap2_orc/000002_0
-rwxr-xr-x 3 hadoop supergroup 101485801 2018-03-14 09:26 /user/hive/warehouse/test1.db/pamap2_orc/000003_0
-rwxr-xr-x 3 hadoop supergroup 86684110 2018-03-14 09:26 /user/hive/warehouse/test1.db/pamap2_orc/000004_0
-rwxr-xr-x 3 hadoop supergroup 81040336 2018-03-14 09:26 /user/hive/warehouse/test1.db/pamap2_orc/000005_0
-rwxr-xr-x 3 hadoop supergroup 27038160 2018-03-14 09:26 /user/hive/warehouse/test1.db/pamap2_orc/000006_0
-rwxr-xr-x 3 hadoop supergroup 11794464 2018-03-14 09:26 /user/hive/warehouse/test1.db/pamap2_orc/000007_0
-rwxr-xr-x 3 hadoop supergroup 4228488 2018-03-14 09:26 /user/hive/warehouse/test1.db/pamap2_orc/000008_0
Pyspark for data processing and Machine Learning
Utilize SparkSQL and ml library to perform data pre-processing steps and machine learning to perform classification,
Importing necessary modules
from pyspark.sql import SparkSession
from pyspark import SparkConf, SparkContext
import pyspark.sql.functions as fn
from pyspark.ml.classification import RandomForestClassifier
from pyspark.ml.clustering import KMeans,BisectingKMeans
from pyspark.ml.feature import StringIndexer,OneHotEncoder,VectorAssembler,PCA as sparkpca,MinMaxScaler as sparkMinMaxScaler,Normalizer,VectorIndexer
from pyspark.ml import Pipeline as SparkPipeline,PipelineModel
import pandas as pd
from matplotlib import pyplot as plt
from pyspark.sql.types import DateType
from pyspark.ml.evaluation import RegressionEvaluator,MulticlassClassificationEvaluator,BinaryClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.sql.types import *
from datetime import datetime
import numpy as np
%matplotlib inline
from pyspark.sql.types import *
from pyspark.sql.functions import array, explode, lit
import warnings
from operator import add
import re
from functools import reduce
warnings.filterwarnings('ignore')
Creating SparkSession object and reading orc input file
spark = SparkSession.builder.getOrCreate()
inputdataraw = spark.read.orc('/user/hive/warehouse/test1.db/pamap2_orc')
inputdataraw.cache()
inputdataraw.count()
2872533
inputdataraw.groupBy('activity').agg(fn.count('time')).show()
+--------+-----------+
|activity|count(time)|
+--------+-----------+
| 0.0| 929661|
| 7.0| 188107|
| 1.0| 192523|
| 4.0| 238761|
| 3.0| 189931|
| 2.0| 185188|
| 17.0| 238690|
| 13.0| 104944|
| 6.0| 164600|
| 24.0| 49360|
| 5.0| 98199|
| 16.0| 175353|
| 12.0| 117216|
+--------+-----------+
inputdataraw.printSchema()
root
|-- time: double (nullable = true)
|-- activity: double (nullable = true)
|-- heart_rate: double (nullable = true)
|-- hand_temp: double (nullable = true)
|-- hand_acc16g_1: double (nullable = true)
|-- hand_acc16g_2: double (nullable = true)
|-- hand_acc16g_3: double (nullable = true)
|-- hand_acc6g_1: double (nullable = true)
|-- hand_acc6g_2: double (nullable = true)
|-- hand_acc6g_3: double (nullable = true)
|-- hand_gyro_1: double (nullable = true)
|-- hand_gyro_2: double (nullable = true)
|-- hand_gyro_3: double (nullable = true)
|-- hand_mag_1: double (nullable = true)
|-- hand_mag_2: double (nullable = true)
|-- hand_mag_3: double (nullable = true)
|-- hand_orient_1: double (nullable = true)
|-- hand_orient_2: double (nullable = true)
|-- hand_orient_3: double (nullable = true)
|-- hand_orient_4: double (nullable = true)
|-- chest_temp: double (nullable = true)
|-- chest_acc16g_1: double (nullable = true)
|-- chest_acc16g_2: double (nullable = true)
|-- chest_acc16g_3: double (nullable = true)
|-- chest_acc6g_1: double (nullable = true)
|-- chest_acc6g_2: double (nullable = true)
|-- chest_acc6g_3: double (nullable = true)
|-- chest_gyro_1: double (nullable = true)
|-- chest_gyro_2: double (nullable = true)
|-- chest_gyro_3: double (nullable = true)
|-- chest_mag_1: double (nullable = true)
|-- chest_mag_2: double (nullable = true)
|-- chest_mag_3: double (nullable = true)
|-- chest_orient_1: double (nullable = true)
|-- chest_orient_2: double (nullable = true)
|-- chest_orient_3: double (nullable = true)
|-- chest_orient_4: double (nullable = true)
|-- ankle_temp: double (nullable = true)
|-- ankle_acc16g_1: double (nullable = true)
|-- ankle_acc16g_2: double (nullable = true)
|-- ankle_acc16g_3: double (nullable = true)
|-- ankle_acc6g_1: double (nullable = true)
|-- ankle_acc6g_2: double (nullable = true)
|-- ankle_acc6g_3: double (nullable = true)
|-- ankle_gyro_1: double (nullable = true)
|-- ankle_gyro_2: double (nullable = true)
|-- ankle_gyro_3: double (nullable = true)
|-- ankle_mag_1: double (nullable = true)
|-- ankle_mag_2: double (nullable = true)
|-- ankle_mag_3: double (nullable = true)
|-- ankle_orient_1: double (nullable = true)
|-- ankle_orient_2: double (nullable = true)
|-- ankle_orient_3: double (nullable = true)
|-- ankle_orient_4: double (nullable = true)
inputdataraw.describe().toPandas()
summary | time | activity | heart_rate | hand_temp | hand_acc16g_1 | hand_acc16g_2 | hand_acc16g_3 | hand_acc6g_1 | hand_acc6g_2 | ... | ankle_gyro_1 | ankle_gyro_2 | ankle_gyro_3 | ankle_mag_1 | ankle_mag_2 | ankle_mag_3 | ankle_orient_1 | ankle_orient_2 | ankle_orient_3 | ankle_orient_4 | |
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
0 | count | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 | ... | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 | 2872533 |
1 | mean | 1834.3538678511266 | 5.4662425114002176 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
2 | stddev | 1105.6889982931357 | 6.331333413102728 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
3 | min | 5.64 | 0.0 | 57.0 | 24.75 | -145.367 | -104.301 | -101.452 | -61.4895 | -61.868 | ... | -23.995 | -18.1269 | -14.0196 | -172.865 | -137.908 | -109.289 | -0.253628 | -0.956876 | -0.876838 | -0.997281 |
4 | max | 4475.63 | 24.0 | NaN | NaN | NaN | NaN | NaN | NaN | NaN | ... | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN | NaN |
5 rows × 55 columns
inputdataraw.show(1)
+----+--------+----------+---------+-------------+-------------+-------------+------------+------------+------------+-----------+-----------+-----------+----------+----------+----------+-------------+-------------+-------------+-------------+----------+--------------+--------------+--------------+-------------+-------------+-------------+------------+------------+------------+-----------+-----------+-----------+--------------+--------------+--------------+--------------+----------+--------------+--------------+--------------+-------------+-------------+-------------+------------+------------+------------+-----------+-----------+-----------+--------------+--------------+--------------+--------------+
|time|activity|heart_rate|hand_temp|hand_acc16g_1|hand_acc16g_2|hand_acc16g_3|hand_acc6g_1|hand_acc6g_2|hand_acc6g_3|hand_gyro_1|hand_gyro_2|hand_gyro_3|hand_mag_1|hand_mag_2|hand_mag_3|hand_orient_1|hand_orient_2|hand_orient_3|hand_orient_4|chest_temp|chest_acc16g_1|chest_acc16g_2|chest_acc16g_3|chest_acc6g_1|chest_acc6g_2|chest_acc6g_3|chest_gyro_1|chest_gyro_2|chest_gyro_3|chest_mag_1|chest_mag_2|chest_mag_3|chest_orient_1|chest_orient_2|chest_orient_3|chest_orient_4|ankle_temp|ankle_acc16g_1|ankle_acc16g_2|ankle_acc16g_3|ankle_acc6g_1|ankle_acc6g_2|ankle_acc6g_3|ankle_gyro_1|ankle_gyro_2|ankle_gyro_3|ankle_mag_1|ankle_mag_2|ankle_mag_3|ankle_orient_1|ankle_orient_2|ankle_orient_3|ankle_orient_4|
+----+--------+----------+---------+-------------+-------------+-------------+------------+------------+------------+-----------+-----------+-----------+----------+----------+----------+-------------+-------------+-------------+-------------+----------+--------------+--------------+--------------+-------------+-------------+-------------+------------+------------+------------+-----------+-----------+-----------+--------------+--------------+--------------+--------------+----------+--------------+--------------+--------------+-------------+-------------+-------------+------------+------------+------------+-----------+-----------+-----------+--------------+--------------+--------------+--------------+
|5.64| 0.0| NaN| 33.0| 2.79143| 7.55389| -7.06374| 2.87553| 7.88823| -6.76139| 1.0164| -0.28941| 1.38207| -11.6508| -3.73683| 31.1784| 1.0| 0.0| 0.0| 0.0| 36.125| 1.94739| 9.59644| -3.12873| 1.81868| 9.49711| -2.91989| 0.124025| 0.112482| -0.0449469| -20.2905| -32.0492| 8.67906| 1.0| 0.0| 0.0| 0.0| 33.8125| 9.84408| -0.808951| -1.64674| 9.73055| -0.846832| -1.29665| -0.027148| -0.0311901| -0.0408973| -47.7695| -2.58701| 59.8481| -0.0128709| 0.747947| -0.0798406| 0.658813|
+----+--------+----------+---------+-------------+-------------+-------------+------------+------------+------------+-----------+-----------+-----------+----------+----------+----------+-------------+-------------+-------------+-------------+----------+--------------+--------------+--------------+-------------+-------------+-------------+------------+------------+------------+-----------+-----------+-----------+--------------+--------------+--------------+--------------+----------+--------------+--------------+--------------+-------------+-------------+-------------+------------+------------+------------+-----------+-----------+-----------+--------------+--------------+--------------+--------------+
only showing top 1 row
Referring to the activity id mappings we can infer that activity id = 0 should not be considered for the analysis as it indicates transient movement between different activities.Lets filter them out.
inputdataraw = inputdataraw.withColumn('new_time',fn.round(fn.col('time')%10))
inputdata = inputdataraw.filter(fn.col('activity')!=0)
inputdata = inputdata.withColumn('activity',fn.col('activity').cast('int'))
inputdata.select('activity').distinct().orderBy('activity').show()
+--------+
|activity|
+--------+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
| 7|
| 12|
| 13|
| 16|
| 17|
| 24|
+--------+
We know that heart rate would be considered as an important measure to identify the nature of physical activity,lets group the data by activity and check mean within each group to identify any pattern and validate our hypothesis
heart_rate_agg = inputdata.filter(~fn.isnan('heart_rate')).groupBy('activity').agg(fn.mean('heart_rate').alias('avg_heart_rate'))
heart_rate_agg.sort(fn.desc('avg_heart_rate')).show()
+--------+------------------+
|activity| avg_heart_rate|
+--------+------------------+
| 24|161.98139122729287|
| 5|156.59581411049848|
| 12|129.52348491922683|
| 13|129.15417491921193|
| 6| 124.8799521403882|
| 7|123.82870101174555|
| 4|112.78986505095016|
| 16| 104.1980908410282|
| 17| 90.06959118052366|
| 3| 88.55763688760807|
| 2| 80.01258195995038|
| 1| 75.53568181818181|
+--------+------------------+
Indeed low intensity activities and high intensity activities have varying heart rates as expected.
Checking and treating for Null values
from functools import reduce
null_counts =reduce(lambda x,y:x.union(y),(inputdata.agg(fn.count(fn.when(fn.isnan(c), c)).alias('null_count')).select(fn.lit(c).alias('col_name'),'null_count') for c in inputdata.columns))
Missing sensory data due to wireless data dropping: missing values are indicated with NaN.It is a case of missingness completely at random.
null_counts.sort(fn.desc('null_count')).show(100)
+--------------+----------+
| col_name|null_count|
+--------------+----------+
| heart_rate| 1765464|
| hand_acc16g_2| 11124|
| hand_orient_4| 11124|
| hand_orient_1| 11124|
| hand_mag_1| 11124|
| hand_acc6g_3| 11124|
| hand_acc6g_2| 11124|
| hand_temp| 11124|
| hand_gyro_2| 11124|
| hand_orient_3| 11124|
| hand_acc16g_1| 11124|
| hand_acc16g_3| 11124|
| hand_orient_2| 11124|
| hand_gyro_3| 11124|
| hand_acc6g_1| 11124|
| hand_gyro_1| 11124|
| hand_mag_3| 11124|
| hand_mag_2| 11124|
|ankle_orient_1| 8507|
| ankle_acc6g_2| 8507|
|ankle_orient_4| 8507|
| ankle_gyro_2| 8507|
|ankle_acc16g_1| 8507|
| ankle_mag_1| 8507|
|ankle_orient_3| 8507|
|ankle_acc16g_2| 8507|
|ankle_acc16g_3| 8507|
| ankle_gyro_3| 8507|
|ankle_orient_2| 8507|
| ankle_acc6g_3| 8507|
| ankle_mag_3| 8507|
| ankle_mag_2| 8507|
| ankle_acc6g_1| 8507|
| ankle_temp| 8507|
| ankle_gyro_1| 8507|
| chest_gyro_1| 2420|
| chest_acc6g_2| 2420|
| chest_mag_1| 2420|
|chest_orient_2| 2420|
|chest_acc16g_3| 2420|
| chest_mag_2| 2420|
| chest_acc6g_1| 2420|
| chest_gyro_2| 2420|
| chest_mag_3| 2420|
| chest_acc6g_3| 2420|
| chest_temp| 2420|
|chest_orient_3| 2420|
|chest_orient_4| 2420|
|chest_acc16g_1| 2420|
|chest_orient_1| 2420|
| chest_gyro_3| 2420|
|chest_acc16g_2| 2420|
| time| 0|
| activity| 0|
+--------------+----------+
As we know the sensory data is time framed we can forward-fill and backward-fill null values using SparkSQL Window function - partition by activity and order by time
for column in inputdata.columns:
inputdata = inputdata.withColumn(column,fn.when(fn.isnan(fn.col(column)),None).otherwise(fn.col(column)))
from pyspark.sql import Window
import sys
window_ffill = Window.partitionBy('activity').orderBy('time').rowsBetween(-sys.maxsize,0)
window_bfill = Window.partitionBy('activity').orderBy('time').rowsBetween(0,sys.maxsize)
for c in [x for x in inputdata.columns if x not in ['time','activity']]:
inputdata = inputdata.withColumn(c,fn.last(fn.col(c),ignorenulls=True).over(window_ffill))
for c in [x for x in inputdata.columns if x not in ['time','activity']]:
inputdata = inputdata.withColumn(c,fn.last(fn.col(c),ignorenulls=True).over(window_bfill))
null_counts_check =reduce(lambda x,y:x.union(y),(inputdata.agg(fn.count(fn.when(fn.isnull(c), c)).alias('null_count')).select(fn.lit(c).alias('col_name'),'null_count') for c in inputdata.columns))
null_counts_check.sort(fn.desc('null_count')).show()
+--------------+----------+
| col_name|null_count|
+--------------+----------+
|ankle_acc16g_3| 0|
| hand_temp| 0|
| ankle_acc6g_1| 0|
| hand_orient_2| 0|
|chest_acc16g_1| 0|
| ankle_gyro_1| 0|
| hand_gyro_3| 0|
| ankle_acc6g_3| 0|
| hand_orient_3| 0|
| ankle_mag_3| 0|
| activity| 0|
| chest_mag_2| 0|
|chest_orient_2| 0|
| ankle_acc6g_2| 0|
|ankle_acc16g_1| 0|
| chest_acc6g_3| 0|
| chest_gyro_1| 0|
| hand_acc16g_1| 0|
| chest_gyro_2| 0|
| chest_acc6g_2| 0|
+--------------+----------+
only showing top 20 rows
Feature engineering
For each body sensor measurement available lets compare each value and generate aggregated features such as mean,standard deviation,min,max,max-min etc.
##feature engineering
hand_cols = [col for col in inputdata.columns if re.search(r"hand",col)]
chest_cols = [col for col in inputdata.columns if re.search(r"chest",col)]
ankle_cols = [col for col in inputdata.columns if re.search(r"ankle",col)]
sensor_cols = ['temp',
'acc16g_1',
'acc16g_2',
'acc16g_3',
'acc6g_1',
'acc6g_2',
'acc6g_3',
'gyro_1',
'gyro_2',
'gyro_3',
'mag_1',
'mag_2',
'mag_3',
'orient_1',
'orient_2',
'orient_3',
'orient_4']
for col_nm,a,b,c in zip(sensor_cols,hand_cols,chest_cols,ankle_cols):
inputdata = inputdata.withColumn(col_nm+'all_sum',fn.col(a)+fn.col(b)+fn.col(c))
inputdata = inputdata.withColumn(col_nm+'all_max',fn.greatest(fn.col(a),fn.col(b),fn.col(c)))
inputdata = inputdata.withColumn(col_nm+'all_min',fn.least(fn.col(a),fn.col(b),fn.col(c)))
inputdata = inputdata.withColumn(col_nm+'all_max-min',fn.col(col_nm+'all_max')-fn.col(col_nm+'all_min'))
inputdata = inputdata.withColumn(col_nm+'all_mean',fn.col(col_nm+'all_sum')/3)
inputdata = inputdata.withColumn(col_nm+'all_std',fn.sqrt(reduce(add, ((fn.col(x) - fn.col(col_nm+'all_mean')) ** 2 for x in [a,b,c])) /2))
Preparing target variable
From the activity list we can group the activities based on the intensity of the physical activity carried out into minimal,low,medium,high intensity bins for classification purpose.
#Minimun: [lying,sitting,standing,computer work,car driving,folding laundry]
minimum_activity = [1,2,3,9,10,11,18]
#Low: [descending stairs, ironing, house cleaning]
low_activity = [13,17,19]
#medium: [walking,descending stairs,vacuum cleaning]
medium_activity = [4,12,16]
#high: [running,cycling,nordic walking,playing soccer,rope jumping]
high_activity = [5,6,7,20,24]
##target labels
inputdata = inputdata.withColumn('target',fn.when(fn.col('activity').isin(minimum_activity),'min'))
inputdata = inputdata.withColumn('target',fn.when(fn.col('activity').isin(low_activity),'low').otherwise(fn.col('target')))
inputdata = inputdata.withColumn('target',fn.when(fn.col('activity').isin(medium_activity),'medium').otherwise(fn.col('target')))
inputdata = inputdata.withColumn('target',fn.when(fn.col('activity').isin(high_activity),'high').otherwise(fn.col('target')))
inputdata.select(fn.col('target')).distinct().show()
+------+
|target|
+------+
| low|
| high|
|medium|
| min|
+------+
Perform label indexing for each target variable
inputdata = inputdata.withColumn('label',fn.when(fn.col('target')=='min',0))
inputdata = inputdata.withColumn('label',fn.when(fn.col('target')=='low',1).otherwise(fn.col('label')))
inputdata = inputdata.withColumn('label',fn.when(fn.col('target')=='medium',2).otherwise(fn.col('label')))
inputdata = inputdata.withColumn('label',fn.when(fn.col('target')=='high',3).otherwise(fn.col('label')))
Machine Learning using pyspark
Utlizing machine learning to detect the intensity of physical activity using the readings of wearable devices that can track body motions and basic physiological parameters such as heart rate,temperature,acceleration,gyro,orientation parameters of different parts of the body.
Now we have our data processing and transformation using SparkSQL completed,lets perform machine learning using Pyspark ml library
inputdata.cache()
inputdata.count()
1942872
inputdata.select('target','label').distinct().show()
+------+-----+
|target|label|
+------+-----+
| high| 3|
| min| 0|
| low| 1|
|medium| 2|
+------+-----+
Preparing train/test data and features/target
train,test = inputdata.randomSplit([0.8, 0.2])
features = ['heart_rate','hand_temp','hand_acc16g_1','hand_acc16g_2','hand_acc16g_3','hand_acc6g_1','hand_acc6g_2','hand_acc6g_3','hand_gyro_1',
'hand_gyro_2','hand_gyro_3','hand_mag_1','hand_mag_2','hand_mag_3','hand_orient_1','hand_orient_2','hand_orient_3','hand_orient_4',
'chest_temp','chest_acc16g_1','chest_acc16g_2','chest_acc16g_3','chest_acc6g_1','chest_acc6g_2','chest_acc6g_3','chest_gyro_1',
'chest_gyro_2','chest_gyro_3','chest_mag_1','chest_mag_2','chest_mag_3','chest_orient_1','chest_orient_2','chest_orient_3',
'chest_orient_4','ankle_temp','ankle_acc16g_1','ankle_acc16g_2','ankle_acc16g_3','ankle_acc6g_1','ankle_acc6g_2','ankle_acc6g_3',
'ankle_gyro_1','ankle_gyro_2','ankle_gyro_3','ankle_mag_1','ankle_mag_2','ankle_mag_3','ankle_orient_1','ankle_orient_2',
'ankle_orient_3','ankle_orient_4','tempall_sum','tempall_max','tempall_min','tempall_max-min','tempall_mean',
'tempall_std','acc16g_1all_sum','acc16g_1all_max','acc16g_1all_min','acc16g_1all_max-min','acc16g_1all_mean','acc16g_1all_std',
'acc16g_2all_sum','acc16g_2all_max','acc16g_2all_min','acc16g_2all_max-min','acc16g_2all_mean','acc16g_2all_std','acc16g_3all_sum',
'acc16g_3all_max','acc16g_3all_min','acc16g_3all_max-min','acc16g_3all_mean','acc16g_3all_std','acc6g_1all_sum','acc6g_1all_max',
'acc6g_1all_min','acc6g_1all_max-min','acc6g_1all_mean','acc6g_1all_std','acc6g_2all_sum','acc6g_2all_max','acc6g_2all_min',
'acc6g_2all_max-min','acc6g_2all_mean','acc6g_2all_std','acc6g_3all_sum','acc6g_3all_max','acc6g_3all_min','acc6g_3all_max-min',
'acc6g_3all_mean','acc6g_3all_std','gyro_1all_sum','gyro_1all_max','gyro_1all_min','gyro_1all_max-min','gyro_1all_mean','gyro_1all_std',
'gyro_2all_sum','gyro_2all_max','gyro_2all_min','gyro_2all_max-min','gyro_2all_mean','gyro_2all_std','gyro_3all_sum','gyro_3all_max',
'gyro_3all_min','gyro_3all_max-min','gyro_3all_mean','gyro_3all_std','mag_1all_sum','mag_1all_max','mag_1all_min','mag_1all_max-min',
'mag_1all_mean','mag_1all_std','mag_2all_sum','mag_2all_max','mag_2all_min','mag_2all_max-min','mag_2all_mean','mag_2all_std',
'mag_3all_sum','mag_3all_max','mag_3all_min','mag_3all_max-min','mag_3all_mean','mag_3all_std','orient_1all_sum','orient_1all_max',
'orient_1all_min','orient_1all_max-min','orient_1all_mean','orient_1all_std','orient_2all_sum','orient_2all_max','orient_2all_min',
'orient_2all_max-min','orient_2all_mean','orient_2all_std','orient_3all_sum','orient_3all_max','orient_3all_min','orient_3all_max-min',
'orient_3all_mean','orient_3all_std','orient_4all_sum','orient_4all_max','orient_4all_min','orient_4all_max-min','orient_4all_mean',
'orient_4all_std']
target = 'label'
Random forest with Cross-validation using Pyspark
def spark_rf_classifier(train, test,features,label):
mlassembler = VectorAssembler(inputCols=features,outputCol="features")
rf = RandomForestClassifier(labelCol = label, featuresCol="features")
rfpipeline = SparkPipeline(stages=[mlassembler,rf])
rfparamGrid = ParamGridBuilder().addGrid(rf.maxDepth, [5,10,20]).addGrid(rf.numTrees, [100,250,500]).build()
rfevaluator = MulticlassClassificationEvaluator(labelCol =label,predictionCol = "prediction",metricName = "accuracy")
rfcrossval = CrossValidator(estimator=rfpipeline,
estimatorParamMaps=rfparamGrid,
evaluator=rfevaluator,
numFolds=3,parallelism=5)
rfmodel = rfcrossval.fit(train)
rfpredict = rfmodel.transform(test)
rf_accuracy = rfevaluator.evaluate(rfpredict,{rfevaluator.metricName: "accuracy"})
rf_precision = rfevaluator.evaluate(rfpredict,{rfevaluator.metricName: "weightedPrecision"})
rf_recall = rfevaluator.evaluate(rfpredict,{rfevaluator.metricName: "weightedRecall"})
rf_f1 = rfevaluator.evaluate(rfpredict,{rfevaluator.metricName: "f1"})
return ("Random Forest Model",rfmodel,rfpredict,rf_accuracy,rf_precision,rf_recall,rf_f1,rfevaluator,rfcrossval)
result = spark_rf_classifier(train, test,features,target)
print(result[1])
best_pipeline = result[1].bestModel
print(best_pipeline)
best_pipeline.stages
CrossValidatorModel_4b37928f1bbcf19ad86d
PipelineModel_474599db674a829f00e6
[VectorAssembler_409d85531cdc48ea7dd4,
RandomForestClassificationModel (uid=RandomForestClassifier_410bbd81e41f7e012fe4) with 100 trees]
From the cross-validation mechanism we have our best Random forest model,lets check the best paramters of the model
best_rf_model = best_pipeline.stages[1]
best_rf_model.getNumTrees
100
rf = RandomForestClassifier(labelCol = 'label', featuresCol="features")
paramGrid = ParamGridBuilder().addGrid(rf.maxDepth, [5,10,20]).addGrid(rf.numTrees, [100,250,500]).build()
java_model = best_rf_model._java_obj
{param.name: java_model.getOrDefault(java_model.getParam(param.name))
for param in paramGrid[0]}
{'maxDepth': 20, 'numTrees': 100}
We have performed hyperparameter tuning for the random forest model and indentified the hyperparameter with best validation metrics.
Training model and saving for prediction
mlassembler = VectorAssembler(inputCols=features,outputCol="features")
final_rf = RandomForestClassifier(maxDepth=20,numTrees=100,labelCol = 'label', featuresCol="features")
final_rfpipeline = SparkPipeline(stages=[mlassembler,final_rf])
final_rf_model = final_rfpipeline.fit(train)
final_rf_model.write().overwrite().save('/mnt/data/scripts/notebooks/ashwin/models/final_rf_model_trained')
model = PipelineModel.load('/mnt/data/scripts/notebooks/ashwin/models/final_rf_model_trained')
preds = model.transform(test)
preds.groupBy('label','prediction').count().show()
+-----+----------+------+
|label|prediction| count|
+-----+----------+------+
| 0| 0.0|113181|
| 1| 2.0| 250|
| 2| 3.0| 1|
| 3| 2.0| 44|
| 3| 3.0| 99606|
| 2| 2.0|106191|
| 1| 1.0| 68570|
+-----+----------+------+
As we can see most of the actual label and predictions are accurate and only a few are misclassified.This is a very good performing model for this classification purpose.
Extracting Feature importances
feat_imp = model.stages[-1].featureImportances
def ExtractFeatureImp(featureImp, dataset, featuresCol):
list_extract = []
for i in dataset.schema[featuresCol].metadata["ml_attr"]["attrs"]:
list_extract = list_extract + dataset.schema[featuresCol].metadata["ml_attr"]["attrs"][i]
varlist = pd.DataFrame(list_extract)
varlist['score'] = varlist['idx'].apply(lambda x: featureImp[x])
return(varlist.sort_values('score', ascending = False))
feature_importances = ExtractFeatureImp(feat_imp,predictions,"features")
feat_imp_df = pd.DataFrame(feature_importances).sort_values(by=['score'],ascending=False)
feat_imp_df.head()
idx | name | score | |
---|---|---|---|
0 | 0 | heart_rate | 0.155386 |
52 | 52 | tempall_sum | 0.039391 |
1 | 1 | hand_temp | 0.036727 |
54 | 54 | tempall_min | 0.035904 |
18 | 18 | chest_temp | 0.034430 |
feat_imp_df.drop(columns=['idx'],inplace=True)
feat_imp_df.set_index('name',inplace=True)
feat_imp_df.head(20).plot(kind='barh',figsize=(16,8))
plt.title('Feature Importances')
Text(0.5,1,'Feature Importances')
As we can see from the feature importances extracted heart rate and temperature related features play an important role in classification trees.Overall we can see the engineered features have good feature importances in our model compared to the original features.
eval_rf = BinaryClassificationEvaluator(rawPredictionCol='prediction', labelCol='label', metricName='areaUnderROC')
eval_rf.evaluate(preds)
1.0