[pyspark] Data Types in Spark MLLib

Spark 의 ML 라이브러리는 대부분 Dataframe 기반으로 작동하는데,
Matrix 의 경우는 RDD 기반의 MLLib 에만 존재하고 있다. (없애지는 않지만 더이상 개선도 없을거라고)
RDD 기반 개선이 없다고 해도 지나치기는 아쉬워서 기록 차원에서 작동만 시켜보고 넘어감
– Dataframe 기반에서는 Vector 타입만 사용가능
– 이 외의 기능을 원한다면 df.toPandas()Pandas Dataframe 으로 변환 후 사용할 것
– Spark SQL 에서도 Pandas UDF 를 함께 사용하면 속도도 빠름

원문 https://medium.com/analytics-vidhya/data-types-in-spark-mllib-966b4800f893

Local Vector

dense vector: 일반적인 벡터 표기 (모든 값 표기)
sparse vector: ‘0’ 값이 많은 희소 벡터 (0이 아닌 값만 표기)

%pyspark

from pyspark.mllib.linalg import Vectors

## Dense Vector
print(Vectors.dense([1,2,3,4,5,6,0]))
# ==> [1.0,2.0,3.0,4.0,5.0,6.0,0.0]

### SPARSE VECTOR 
### Vectors.sparse( length, index_of_non_zero_values, non_zero_values)
### Indices values should be strictly increasing

print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]))
# ==> (10,[0,1,2,4,5],[1.0,5.0,3.0,5.0,7.0])
print(Vectors.sparse(10, [0,1,2,4,5], [1.0,5.0,3.0,5.0,7]).toArray())
# ==> [1. 5. 3. 0. 5. 7. 0. 0. 0. 0.]

Labeled Point

벡터로 정의된 features 와 함께 정의되는 label 로 이루어짐.
목표가 있는 학습 등에서 label 에 True/False 또는 가중치 등을 설정하면 적당한 자료구조

%pyspark

from pyspark.mllib.regression import LabeledPoint

# set a Label against a Dense Vector
point_1 = LabeledPoint(2,Vectors.dense([1,2,3,4,5]))

# Features 
print(point_1.features)
# ==> [1.0,2.0,3.0,4.0,5.0]
# Label
print(point_1.label)
# ==> 1.0

Local Matrix

Vector와 마찬가지로 dense 와 sparse 가 있다

%pyspark

from pyspark.mllib.linalg import Matrices

# create a dense matrix of 3 Rows and 2 columns
matrix_1 = Matrices.dense(3, 2, [1,2,3,4,5,6])
print(matrix_1)
# ==> DenseMatrix([[1., 4.],[2., 5.],[3., 6.]])
print(matrix_1.toArray())
# ==> [[1. 4.],[2. 5.], [3. 6.]]

# create a sparse matrix
# col 3 x row 3 매트릭스
# colIndices = [0,1,2]; [3] 은 무시!
# rowIndices = [0,0,2]
matrix_2 = Matrices.sparse(3, 3, [0, 1, 2, 3], [0, 0, 2], [9, 6, 8])
print(matrix_2)
# ==> (rowIndex,colIndex) : 3 X 3 CSCMatrix (0,0) 9.0 (0,1) 6.0 (2,2) 8.0 
print(matrix_2.toArray())
# ==> [[9. 6. 0.], [0. 0. 0.], [0. 0. 8.]]

Row Matrix

각 행은 로컬 벡터입니다. 여러 파티션에 행을 저장할 수 있습니다. Random Forest와 같은 알고리즘은 알고리즘이 행을 분할하여 여러 트리를 생성하므로 Row Matrix를 사용하여 구현할 수 있습니다. 한 나무의 결과는 다른 나무에 의존하지 않습니다. 따라서 분산 아키텍처를 활용하고 Random Forest for BigData와 같은 알고리즘에 대해 병렬 처리를 수행 할 수 있습니다.

%pyspark

from pyspark.mllib.linalg.distributed import RowMatrix
# if not installed 'numpy', raise error ==> pip3 install numpy

# Create an RDD of vectors.
rows = sc.parallelize([[1, 2, 3], [4, 5, 6], [7, 8, 9], [10, 11, 12]])

# Create a RowMatrix from an RDD of vectors.
mat = RowMatrix(rows)

# Get its size.
print(mat.numRows())  # 4
print(mat.numCols())  # 3

# Get the rows as an RDD of vectors again.
rowsRDD = mat.rows

Indexed Row Matrix (Row Matrix + index value)

행이 여러 파티션에 저장되지만 정렬 된 방식으로 저장되는 행 행렬과 유사합니다. 인덱스 값이 각 행에 할당됩니다. 시계열 데이터와 같이 순서가 중요한 알고리즘에서 사용됩니다. IndexedRow의 RDD에서 만들 수 있습니다.

%pyspark

from pyspark.mllib.linalg.distributed import IndexedRow, IndexedRowMatrix

# create RDD
indexed_rows = sc.parallelize([
    IndexedRow(0, [0,1,2]),
    IndexedRow(1, [1,2,3]),
    IndexedRow(2, [3,4,5]),
    IndexedRow(3, [4,2,3]),
    IndexedRow(4, [2,2,5]),
    IndexedRow(5, [4,5,5])
])

# create IndexedRowMatrix
indexed_rows_matrix = IndexedRowMatrix(indexed_rows)
print(indexed_rows_matrix.numRows())
# ==> 6
print(indexed_rows_matrix.numCols())
# ==> 3

Coordinate Matrix

MatrixEntry의 RDD에서 좌표 행렬을 만들 수 있습니다. 행렬의 두 차원이 모두 큰 경우에만 좌표 행렬을 사용합니다.

%pyspark

from pyspark.mllib.linalg.distributed import CoordinateMatrix, MatrixEntry

# Create an RDD of coordinate entries with the MatrixEntry class:
matrix_entries = sc.parallelize([
    MatrixEntry(0, 5, 2), 
    MatrixEntry(1, 1, 1), 
    MatrixEntry(1, 5, 4)
    ])
    
# Create an CoordinateMatrix from an RDD of MatrixEntries.
c_matrix = CoordinateMatrix(matrix_entries)
print(c_matrix.numCols())
# ==> 6
print(c_matrix.numRows())  # 좌표라서 rowSize = 2 인가?
# ==> 2

Block Matrix

블록 매트릭스에서 우리는 다른 기계에 큰 매트릭스의 다른 부분 행렬을 저장할 수 있습니다. 블록 치수를 지정해야합니다. 아래 예제와 같이 3X3이 있고 각 블록에 대해 좌표를 제공하여 행렬을 지정할 수 있습니다.

%pyspark

# import the libraries
from pyspark.mllib.linalg import Matrices
from pyspark.mllib.linalg.distributed import BlockMatrix

# Create an RDD of sub-matrix blocks.
blocks = sc.parallelize([
    ((0, 0), Matrices.dense(3, 3, [1, 2, 1, 2, 1, 2, 1, 2, 1])),
    ((1, 1), Matrices.dense(3, 3, [3, 4, 5, 3, 4, 5, 3, 4, 5])),
    ((2, 0), Matrices.dense(3, 3, [1, 1, 1, 1, 1, 1, 1, 1, 1]))
    ])
    
# Create a BlockMatrix from an RDD of sub-matrix blocks  of size 3X3
b_matrix = BlockMatrix(blocks, 3, 3)

print(b_matrix.colsPerBlock)  # columns per block
# ==> 3
print(b_matrix.rowsPerBlock)  # rows per block
# ==> 3

# convert the block matrix to local matrix
local_mat = b_matrix.toLocalMatrix()
# print local matrix
print(local_mat.toArray())
"""
[[1. 2. 1. 0. 0. 0.]
 [2. 1. 2. 0. 0. 0.]
 [1. 2. 1. 0. 0. 0.]
 [0. 0. 0. 3. 3. 3.]
 [0. 0. 0. 4. 4. 4.]
 [0. 0. 0. 5. 5. 5.]
 [1. 1. 1. 0. 0. 0.]
 [1. 1. 1. 0. 0. 0.]
 [1. 1. 1. 0. 0. 0.]]
""" 

댓글 남기기

아래 항목을 채우거나 오른쪽 아이콘 중 하나를 클릭하여 로그 인 하세요:

WordPress.com 로고

WordPress.com의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

Google photo

Google의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

Twitter 사진

Twitter의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

Facebook 사진

Facebook의 계정을 사용하여 댓글을 남깁니다. 로그아웃 /  변경 )

%s에 연결하는 중