Spark SQL, DataFrames, Datasets
- Spark 모듈중에 하나
- 구조화된 데이터 처리를 위한 모듈
Starting Point
import org.apache.spark.sql.SparkSession;
SparkSession spark = SparkSession
.builder()
.appName("Java Spark SQL basic example")
.config("spark.some.config.option", "some-value")
.getOrCreate();
- hive 기능도 제공을 한다 (spark 2.0 builtin support)
Creating DataFrames(Dataset<Row>)
- RDD에서 DataFrames을 만들수도 있다.
- Hive Table
- Spark Data Source
- Parquet File
- ORC File
- JSON File
- Hive
- JDBC
- Avro
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");
// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Creating Dataset<T>
- Serialize를 이용해 Encoder를 사용하여 Type이 있는 Dataset를 생성할수 있다.
Java Beans를 이용하여 Dataset를 생성을 할수 있다.
import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
public static class Person implements Serializable {
private String name;
private int age;
setter...
getter..
}
Person person = new Person();
person.setName("Andy");
person.setAge(32);
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
Collections.singletonList(person),
personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+
자주 사용하는 Encoder는 미리 정의가 되어있다.
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(
(MapFunction<Integer, Integer>) value -> value + 1,
integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]
Class를 이용하여 Dataframe을 Dataset를 변환할수 있다.
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age| name|
// +----+-------+
// |null|Michael|
// | 30| Andy|
// | 19| Justin|
// +----+-------+
Dataframe와 Dataset<T>의 차이
- Dataset를 Spark 2.0에 나온 새로운 타입을 가지고 있는 데이터 모델
- Untype Dataset을 Dataframe 이다
- Dataset<Row> = DataFrame
Example
// To create Dataset<Row> using SparkSession
Dataset<Row> people = spark.read().parquet("...");
Dataset<Row> department = spark.read().parquet("...");
people.filter(people.col("age").gt(30))
.join(department, people.col("deptId").equalTo(department.col("id")))
.groupBy(department.col("name"), people.col("gender"))
.agg(avg(people.col("salary")), max(people.col("age")));
'빅데이터 처리 > Spark' 카테고리의 다른 글
Spark SQL join, group by and functions (0) | 2019.01.08 |
---|---|
Spark 간단하게 하나의 파일로 Write하는 법 (1) | 2019.01.07 |
Spark AWS S3 접근시 400 에러 처리 방법 (0) | 2019.01.07 |
Spark Word Count Example (0) | 2017.08.07 |
Spark RDD (0) | 2017.07.21 |