빅데이터 처리/Spark

Spark SQL, DataFrames, Datasets

BUST 2019. 1. 5. 15:52

Spark SQL, DataFrames, Datasets

  • Spark 모듈중에 하나
  • 구조화된 데이터 처리를 위한 모듈


Starting Point

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();
  • hive 기능도 제공을 한다 (spark 2.0 builtin support)


Creating DataFrames(Dataset<Row>)

  • RDD에서 DataFrames을 만들수도 있다.
  • Hive Table
  • Spark Data Source
    • Parquet File
    • ORC File
    • JSON File
    • Hive
    • JDBC
    • Avro
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+



Creating Dataset<T>

  • Serialize를 이용해 Encoder를 사용하여 Type이 있는 Dataset를 생성할수 있다.

Java Beans를 이용하여 Dataset를 생성을 할수 있다.

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;
  setter...
  getter..
}

Person person = new Person();
person.setName("Andy");
person.setAge(32);

Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+



자주 사용하는 Encoder는 미리 정의가 되어있다.

Encoder<Integer> integerEncoder = Encoders.INT();

Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);

Dataset<Integer> transformedDS = primitiveDS.map(

    (MapFunction<Integer, Integer>) value -> value + 1,

    integerEncoder);

transformedDS.collect(); // Returns [2, 3, 4]


Class를 이용하여 Dataframe을 Dataset를 변환할수 있다.

String path = "examples/src/main/resources/people.json";

Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);

peopleDS.show();

// +----+-------+

// | age|   name|

// +----+-------+

// |null|Michael|

// |  30|   Andy|

// |  19| Justin|

// +----+-------+



Dataframe와 Dataset<T>의 차이

  • Dataset를 Spark 2.0에 나온 새로운 타입을 가지고 있는 데이터 모델
  • Untype Dataset을 Dataframe 이다
    • Dataset<Row> = DataFrame

Example

   // To create Dataset<Row> using SparkSession
   Dataset<Row> people = spark.read().parquet("...");
   Dataset<Row> department = spark.read().parquet("...");

   people.filter(people.col("age").gt(30))
     .join(department, people.col("deptId").equalTo(department.col("id")))
     .groupBy(department.col("name"), people.col("gender"))
     .agg(avg(people.col("salary")), max(people.col("age")));