それでは毛玉諸君、これにて失敬

日々の精進を備忘録的に綴ります。

pyspark導入、サンプルデータを作成してみる

ネトフリ版デビルマンを観て衝撃を受けたko_y346です。
原作のおどろおどろしい雰囲気が残っててとても良かった…

この記事は?

業務で使用するpysparkについての知見を個人的にまとめているのですが、
せっかくなので共有しようと思います。

今回はdockerを使ってローカルでpysparkの環境を用意するとこ、適当にサンプルデータを用意するところまでをやります。

環境

windows10(MINGW64_NT-10.0-19043)
bash

環境づくり

この記事が参考になりました*1
ありがたいことに*2pyspark環境のdocker imageが既にあるので、それを引っ張ってきます。

docker pull jupyter/pyspark-notebook

今落としたイメージからコンテナを作成します。
bashなので頭にwinptyと付けないと怒られます*3
また、今回はjupyter notebook上で作業するため、portを指定しておきます。

winpty docker run -it --name spark_test -p 8889:8889 jupyter/pyspark-notebook:latest bash

無事に仮想環境に入れたら、先ほどのportを使ってjupyter notebookを起動します。

jupyter notebook --port 8889 --ip=0.0.0.0 --allow-root

表示されたリンクから、ブラウザ上で起動できたら成功です。

サンプルデータを作成してみる

まずは諸々import
(自分が試したときはpysparkが入ってない!と怒られたので、pip installで入れました)

!pip install pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as fn
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType

import random, string
from typing import List
from pyspark.sql import DataFrame


spark: SparkSession = SparkSession.builder.appName("SimpleApp").getOrCreate()
# print(spark)

ランダムな文字列と数値を返す関数を用意したので、これで適当なデータを作ります。

def get_random_name(n):
    """
    長さnのランダムな文字列を生成する
    
    Parameter
    ---------
        n: int
            文字数
    """
    
    return "".join(random.choices(string.ascii_letters + string.digits, k=n))


def get_random_num(n_min=1, n_max=100, int_flg=False):
    """
    ランダムな数値を返す
    """
    ret = random.uniform(n_min, n_max)
    
    if int_flg:
        ret = int(ret)
    return ret

サンプルデータ作成

def get_sample_data(size: int,  name_len=8):
    """
    サンプルデータ生成

    Parameters
    ----------
        size: int
            レコード数

    Examples
    --------
        data = get_sample_data(3,  name_len=8)
        print(data)
        
        > [('FaU2IrJ7', 41, 100.68, 67.56),
        >  ('LZeq1NoP', 69, 135.13, 82.93),
        >  ('fzWslHC8', 79, 111.62, 41.1)]    
    """
    
    shop_names = [get_random_name(name_len) for _ in range(size)]
    employee_nums = [get_random_num(n_min=1, n_max=100, int_flg=True) for _ in range(size)]
    areas = [round(get_random_num(n_min=100, n_max=200), 2) for _ in range(size)]
    total_sales = [round(get_random_num(n_min=40, n_max=100), 2) for _ in range(size)]
    
    data = []
    
    for shop_name, employee_num, area, total_sale in zip(shop_names,
                                                         employee_nums, 
                                                         areas, total_sales):
        data.append((shop_name, employee_num, area, total_sale))
    
    return data

この関数を使ってデータを作成し、スキーマを定義してテーブル化します。

N_LEN = 10

data = get_sample_data(N_LEN,  name_len=8)

schema = StructType([StructField("shop_name", StringType(), True),
                     StructField("employee_num", IntegerType(), True),
                     StructField("area", DoubleType(), True),
                     StructField("total_sale", DoubleType(), True)])

rdd = spark.sparkContext.parallelize(data)
df = spark.createDataFrame(rdd, schema) 
+---------+------------+------+----------+
|shop_name|employee_num|  area|total_sale|
+---------+------------+------+----------+
| hFdR3ms8|          60|127.92|     69.53|
| ZTtLeNsB|          30|196.62|     47.83|
| HLvgv20p|          49|163.81|     65.76|
| ksE9oNVq|           8|106.29|      93.7|
| CjTFtx7w|          95|157.12|     70.28|
| tR1RFKCN|          72|136.97|     78.02|
| ejcoslwG|          77|166.75|      75.2|
| 5xk66yfY|          71|137.04|     63.29|
| jngIjdVe|          43|167.42|     67.68|
| zn6ppB4o|           5|170.14|     80.41|
+---------+------------+------+----------+

ものすごく雑ですが、お店の従業員数、面積、売り上げのデータが出来ました。
次回があれば、これを使って何かやろうと思います。

*1:mseeeen.msen.jp

*2:hub.docker.com

*3:この記事で詳しく解説されています。WindowsのコンソールプログラムとUNIXの仮想端末を通信させるために必要らしいです。qiita.com