The Transactions dataset should have the following attributes for each transaction:
TransID: unique sequential number (integer) from 1 to 5,000,000 (the file has 5M transactions)
CustID: References one of the customer IDs, i.e., from 1 to 50,000 (on Avg. a customer has 100 trans.)
TransTotal: random number (float) between 10 and 1000
TransNumItems: random number (integer) between 1 and 10
TransDesc: random text of characters of length between 20 and 50
--------------------------------------------------------------------------------------------
1) T1: Filter out (drop) the transactions from T whose total amount is less than $200
2) T2: Over T1, group the transactions by the Number of Items it has, and for each group
calculate the sum of total amounts, the average of total amounts, the min and the max of
the total amounts.
3) Report back T2 to the client side.
4) T3: Over T1, group the transactions by customer ID, and for each group report the
customer ID, and the transactions’ count.
5) T4: Filter out (drop) the transactions from T whose total amount is less than $600
6) T5: Over T4, group the transactions by customer ID, and for each group report the
customer ID, and the transactions’ count.
Queries in spark sql:
Project3:
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> import spark.implicits._
import spark.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
1)
scala> val input = sqlContext.read.json("t.json")
input: org.apache.spark.sql.DataFrame = [cid: bigint, desc: string ... 3 more fields]
scala> input.registerTempTable("trans")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> val result = sqlContext.sql("SELECT * from trans where ttotal < 200")
result: org.apache.spark.sql.DataFrame = [cid: bigint, desc: string ... 3 more fields]
scala> result.show()
+-----+-------------------+----+------+------+
| cid| desc| tid|titems|ttotal|
+-----+-------------------+----+------+------+
|10111|computeraccessories| 112| 3| 150|
|10111|computeraccessories| 114| 5| 125|
|10111|computeraccessories| 116| 4| 180|
|10114|computeraccessories| 118| 3| 180|
|10115|computeraccessories| 119| 5| 100|
|10112|computeraccessories|1114| 5| 50|
|10112|computeraccessories|1115| 2| 150|
2)
scala> val result = sqlContext.sql("SELECT * from trans where ttotal < 200")
result: org.apache.spark.sql.DataFrame = [cid: bigint, desc: string ... 3 more fields]
scala> result.groupBy("titems").sum("ttotal").show()
[Stage 53:=======================> (42 + 1) / 100[Stage
53:============================> (52 + 2) / 100[Stage
53:====================================> (67 + 1) / 100[Stage
53:==========================================> (78 + 1) / 100[Stage
53:==================================================> (92 + 1) /
100 [Stage
55:=====================================> (50 + 1) / 75[Stage
55:==========================================> (57 + 1) / 75[Stage
55:===================================================> (69 + 1) /
75 +------+-----------+
|titems|sum(ttotal)|
+------+-----------+
| 5| 275|
| 3| 330|
| 2| 150|
| 4| 180|
+------+-----------+
scala> result.groupBy("titems").min("ttotal").show()
+------+-----------+
|titems|min(ttotal)|
+------+-----------+
| 5| 50|
| 3| 150|
| 2| 150|
| 4| 180|
scala> result.groupBy("titems").max("ttotal").show()
+------+-----------+
|titems|max(ttotal)|
+------+-----------+
| 5| 125|
| 3| 180|
| 2| 150|
| 4| 180|
3)
4) scala> result.groupBy("cid").count().show()
[Stage 63:===================================> (64 + 1) / 100[Stage
63:===========================================> (79 + 1) / 100[Stage
63:==================================================> (92 + 1) /
100 [Stage
65:===========================================> (58 + 1) / 75[Stage
65:=====================================================> (72 + 1) /
75 +-----+-----+
| cid|count|
+-----+-----+
|10112| 2|
|10114| 1|
|10111| 3|
|10115| 1|
5)
scala> val result3=result2.filter(result2("ttotal")<600)
result3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cid: bigint, desc: string ... 3 more
fields]
scala> result3.show
+-----+-------------------+----+------+------+
| cid| desc| tid|titems|ttotal|
+-----+-------------------+----+------+------+
|10111|computeraccessories| 111| 5| 500|
|10111|computeraccessories| 112| 3| 150|
|10112|computeraccessories| 113| 4| 200|
|10111|computeraccessories| 114| 5| 125|
|10112|computeraccessories| 115| 5| 400|
|10111|computeraccessories| 116| 4| 180|
|10113|computeraccessories| 117| 5| 500|
|10114|computeraccessories| 118| 3| 180|
|10115|computeraccessories| 119| 5| 100|
|10114|computeraccessories|1111| 3| 500|
|10112|computeraccessories|1114| 5| 50|
|10112|computeraccessories|1115| 2| 150|
6)
result3.groupBy("cid").count().show()
scala> result3.groupBy("cid").count().show()
[Stage 11:============================> (10 +[Stage
11:====================================> (13 +[Stage
11:==========================================> (15 +[Stage
11:=====================================================> (19
+ [Stage 13:============> (23 +
[Stage 13:================> (30 + [Stage
13:=====================> (39 + [Stage
13:========================> (45 + [Stage
13:===========================> (50 + [Stage
13:==============================> (56 + [Stage
13:===================================> (65 + [Stage
13:========================================> (73 + [Stage
13:============================================> (81 + [Stage
13:===================================================> (93 + [Stage
13:=====================================================> (98
+ [Stage
15:===================================> (47 +[Stage
15:============================================> (60 +[Stage
15:=====================================================> (71
+ +-----+-----+
| cid|count|
+-----+-----+
|10112| 4|
|10114| 2|
|10111| 4|
|10113| 1|
|10115| 1|
+-----+-----+
TransID: unique sequential number (integer) from 1 to 5,000,000 (the file has 5M transactions)
CustID: References one of the customer IDs, i.e., from 1 to 50,000 (on Avg. a customer has 100 trans.)
TransTotal: random number (float) between 10 and 1000
TransNumItems: random number (integer) between 1 and 10
TransDesc: random text of characters of length between 20 and 50
--------------------------------------------------------------------------------------------
1) T1: Filter out (drop) the transactions from T whose total amount is less than $200
2) T2: Over T1, group the transactions by the Number of Items it has, and for each group
calculate the sum of total amounts, the average of total amounts, the min and the max of
the total amounts.
3) Report back T2 to the client side.
4) T3: Over T1, group the transactions by customer ID, and for each group report the
customer ID, and the transactions’ count.
5) T4: Filter out (drop) the transactions from T whose total amount is less than $600
6) T5: Over T4, group the transactions by customer ID, and for each group report the
customer ID, and the transactions’ count.
Queries in spark sql:
Project3:
scala> import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.SparkSession
scala> import spark.implicits._
import spark.implicits._
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
1)
scala> val input = sqlContext.read.json("t.json")
input: org.apache.spark.sql.DataFrame = [cid: bigint, desc: string ... 3 more fields]
scala> input.registerTempTable("trans")
warning: there was one deprecation warning; re-run with -deprecation for details
scala> val result = sqlContext.sql("SELECT * from trans where ttotal < 200")
result: org.apache.spark.sql.DataFrame = [cid: bigint, desc: string ... 3 more fields]
scala> result.show()
+-----+-------------------+----+------+------+
| cid| desc| tid|titems|ttotal|
+-----+-------------------+----+------+------+
|10111|computeraccessories| 112| 3| 150|
|10111|computeraccessories| 114| 5| 125|
|10111|computeraccessories| 116| 4| 180|
|10114|computeraccessories| 118| 3| 180|
|10115|computeraccessories| 119| 5| 100|
|10112|computeraccessories|1114| 5| 50|
|10112|computeraccessories|1115| 2| 150|
2)
scala> val result = sqlContext.sql("SELECT * from trans where ttotal < 200")
result: org.apache.spark.sql.DataFrame = [cid: bigint, desc: string ... 3 more fields]
scala> result.groupBy("titems").sum("ttotal").show()
[Stage 53:=======================> (42 + 1) / 100[Stage
53:============================> (52 + 2) / 100[Stage
53:====================================> (67 + 1) / 100[Stage
53:==========================================> (78 + 1) / 100[Stage
53:==================================================> (92 + 1) /
100 [Stage
55:=====================================> (50 + 1) / 75[Stage
55:==========================================> (57 + 1) / 75[Stage
55:===================================================> (69 + 1) /
75 +------+-----------+
|titems|sum(ttotal)|
+------+-----------+
| 5| 275|
| 3| 330|
| 2| 150|
| 4| 180|
+------+-----------+
scala> result.groupBy("titems").min("ttotal").show()
+------+-----------+
|titems|min(ttotal)|
+------+-----------+
| 5| 50|
| 3| 150|
| 2| 150|
| 4| 180|
scala> result.groupBy("titems").max("ttotal").show()
+------+-----------+
|titems|max(ttotal)|
+------+-----------+
| 5| 125|
| 3| 180|
| 2| 150|
| 4| 180|
3)
4) scala> result.groupBy("cid").count().show()
[Stage 63:===================================> (64 + 1) / 100[Stage
63:===========================================> (79 + 1) / 100[Stage
63:==================================================> (92 + 1) /
100 [Stage
65:===========================================> (58 + 1) / 75[Stage
65:=====================================================> (72 + 1) /
75 +-----+-----+
| cid|count|
+-----+-----+
|10112| 2|
|10114| 1|
|10111| 3|
|10115| 1|
5)
scala> val result3=result2.filter(result2("ttotal")<600)
result3: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [cid: bigint, desc: string ... 3 more
fields]
scala> result3.show
+-----+-------------------+----+------+------+
| cid| desc| tid|titems|ttotal|
+-----+-------------------+----+------+------+
|10111|computeraccessories| 111| 5| 500|
|10111|computeraccessories| 112| 3| 150|
|10112|computeraccessories| 113| 4| 200|
|10111|computeraccessories| 114| 5| 125|
|10112|computeraccessories| 115| 5| 400|
|10111|computeraccessories| 116| 4| 180|
|10113|computeraccessories| 117| 5| 500|
|10114|computeraccessories| 118| 3| 180|
|10115|computeraccessories| 119| 5| 100|
|10114|computeraccessories|1111| 3| 500|
|10112|computeraccessories|1114| 5| 50|
|10112|computeraccessories|1115| 2| 150|
6)
result3.groupBy("cid").count().show()
scala> result3.groupBy("cid").count().show()
[Stage 11:============================> (10 +[Stage
11:====================================> (13 +[Stage
11:==========================================> (15 +[Stage
11:=====================================================> (19
+ [Stage 13:============> (23 +
[Stage 13:================> (30 + [Stage
13:=====================> (39 + [Stage
13:========================> (45 + [Stage
13:===========================> (50 + [Stage
13:==============================> (56 + [Stage
13:===================================> (65 + [Stage
13:========================================> (73 + [Stage
13:============================================> (81 + [Stage
13:===================================================> (93 + [Stage
13:=====================================================> (98
+ [Stage
15:===================================> (47 +[Stage
15:============================================> (60 +[Stage
15:=====================================================> (71
+ +-----+-----+
| cid|count|
+-----+-----+
|10112| 4|
|10114| 2|
|10111| 4|
|10113| 1|
|10115| 1|
+-----+-----+