public class RDDConverterUtilsExt extends Object
Modifier and Type | Class and Description |
---|---|
static class |
RDDConverterUtilsExt.AddRowID |
static class |
RDDConverterUtilsExt.RDDConverterTypes |
Constructor and Description |
---|
RDDConverterUtilsExt() |
Modifier and Type | Method and Description |
---|---|
static org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
addIDToDataFrame(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
org.apache.spark.sql.SparkSession sparkSession,
String nameOfCol)
Add element indices as new column to DataFrame
|
static MatrixBlock |
allocateDenseOrSparse(int rlen,
int clen,
boolean isSparse) |
static MatrixBlock |
allocateDenseOrSparse(long rlen,
long clen,
boolean isSparse) |
static byte[] |
convertMBtoPy4JDenseArr(MatrixBlock mb) |
static MatrixBlock |
convertPy4JArrayToMB(byte[] data,
int rlen,
int clen) |
static MatrixBlock |
convertPy4JArrayToMB(byte[] data,
int rlen,
int clen,
boolean isSparse) |
static MatrixBlock |
convertPy4JArrayToMB(byte[] data,
long rlen,
long clen) |
static MatrixBlock |
convertPy4JArrayToMB(byte[] data,
long rlen,
long clen,
boolean isSparse) |
static MatrixBlock |
convertSciPyCOOToMB(byte[] data,
byte[] row,
byte[] col,
int rlen,
int clen,
int nnz) |
static MatrixBlock |
convertSciPyCOOToMB(byte[] data,
byte[] row,
byte[] col,
long rlen,
long clen,
long nnz) |
static org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> |
coordinateMatrixToBinaryBlock(org.apache.spark.api.java.JavaSparkContext sc,
org.apache.spark.mllib.linalg.distributed.CoordinateMatrix input,
MatrixCharacteristics mcIn,
boolean outputEmptyBlocks)
Example usage:
|
static org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> |
coordinateMatrixToBinaryBlock(org.apache.spark.SparkContext sc,
org.apache.spark.mllib.linalg.distributed.CoordinateMatrix input,
MatrixCharacteristics mcIn,
boolean outputEmptyBlocks) |
static void |
copyRowBlocks(MatrixBlock mb,
int rowIndex,
MatrixBlock ret,
int numRowsPerBlock,
int rlen,
int clen) |
static void |
copyRowBlocks(MatrixBlock mb,
int rowIndex,
MatrixBlock ret,
long numRowsPerBlock,
long rlen,
long clen) |
static void |
copyRowBlocks(MatrixBlock mb,
long rowIndex,
MatrixBlock ret,
int numRowsPerBlock,
int rlen,
int clen) |
static void |
copyRowBlocks(MatrixBlock mb,
long rowIndex,
MatrixBlock ret,
long numRowsPerBlock,
long rlen,
long clen) |
static void |
postProcessAfterCopying(MatrixBlock ret) |
static org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
projectColumns(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df,
ArrayList<String> columns) |
static org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> |
stringDataFrameToVectorDataFrame(org.apache.spark.sql.SparkSession sparkSession,
org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> inputDF)
Convert a dataframe of comma-separated string rows to a dataframe of
ml.linalg.Vector rows.
|
public static org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> coordinateMatrixToBinaryBlock(org.apache.spark.api.java.JavaSparkContext sc, org.apache.spark.mllib.linalg.distributed.CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks)
import org.apache.sysml.runtime.instructions.spark.utils.RDDConverterUtilsExt
import org.apache.sysml.runtime.matrix.MatrixCharacteristics
import org.apache.spark.api.java.JavaSparkContext
import org.apache.spark.mllib.linalg.distributed.MatrixEntry
import org.apache.spark.mllib.linalg.distributed.CoordinateMatrix
val matRDD = sc.textFile("ratings.text").map(_.split(" ")).map(x => new MatrixEntry(x(0).toLong, x(1).toLong, x(2).toDouble)).filter(_.value != 0).cache
require(matRDD.filter(x => x.i == 0 || x.j == 0).count == 0, "Expected 1-based ratings file")
val nnz = matRDD.count
val numRows = matRDD.map(_.i).max
val numCols = matRDD.map(_.j).max
val coordinateMatrix = new CoordinateMatrix(matRDD, numRows, numCols)
val mc = new MatrixCharacteristics(numRows, numCols, 1000, 1000, nnz)
val binBlocks = RDDConverterUtilsExt.coordinateMatrixToBinaryBlock(new JavaSparkContext(sc), coordinateMatrix, mc, true)
sc
- java spark contextinput
- coordinate matrixmcIn
- matrix characteristicsoutputEmptyBlocks
- if true, inject empty blocks if necessaryJavaPairRDD<MatrixIndexes, MatrixBlock>
public static org.apache.spark.api.java.JavaPairRDD<MatrixIndexes,MatrixBlock> coordinateMatrixToBinaryBlock(org.apache.spark.SparkContext sc, org.apache.spark.mllib.linalg.distributed.CoordinateMatrix input, MatrixCharacteristics mcIn, boolean outputEmptyBlocks)
public static org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> projectColumns(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, ArrayList<String> columns)
public static MatrixBlock convertPy4JArrayToMB(byte[] data, long rlen, long clen)
public static MatrixBlock convertPy4JArrayToMB(byte[] data, int rlen, int clen)
public static MatrixBlock convertSciPyCOOToMB(byte[] data, byte[] row, byte[] col, long rlen, long clen, long nnz)
public static MatrixBlock convertSciPyCOOToMB(byte[] data, byte[] row, byte[] col, int rlen, int clen, int nnz)
public static MatrixBlock convertPy4JArrayToMB(byte[] data, long rlen, long clen, boolean isSparse)
public static MatrixBlock allocateDenseOrSparse(int rlen, int clen, boolean isSparse)
public static MatrixBlock allocateDenseOrSparse(long rlen, long clen, boolean isSparse)
public static void copyRowBlocks(MatrixBlock mb, int rowIndex, MatrixBlock ret, int numRowsPerBlock, int rlen, int clen)
public static void copyRowBlocks(MatrixBlock mb, long rowIndex, MatrixBlock ret, int numRowsPerBlock, int rlen, int clen)
public static void copyRowBlocks(MatrixBlock mb, int rowIndex, MatrixBlock ret, long numRowsPerBlock, long rlen, long clen)
public static void copyRowBlocks(MatrixBlock mb, long rowIndex, MatrixBlock ret, long numRowsPerBlock, long rlen, long clen)
public static void postProcessAfterCopying(MatrixBlock ret)
public static MatrixBlock convertPy4JArrayToMB(byte[] data, int rlen, int clen, boolean isSparse)
public static byte[] convertMBtoPy4JDenseArr(MatrixBlock mb)
public static org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> addIDToDataFrame(org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> df, org.apache.spark.sql.SparkSession sparkSession, String nameOfCol)
df
- input data framesparkSession
- the Spark SessionnameOfCol
- name of index columnpublic static org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> stringDataFrameToVectorDataFrame(org.apache.spark.sql.SparkSession sparkSession, org.apache.spark.sql.Dataset<org.apache.spark.sql.Row> inputDF)
Example input rows:
((1.2, 4.3, 3.4))
(1.2, 3.4, 2.2)
[[1.2, 34.3, 1.2, 1.25]]
[1.2, 3.4]
sparkSession
- Spark SessioninputDF
- dataframe of comma-separated row strings to convert to
dataframe of ml.linalg.Vector rowsCopyright © 2018 The Apache Software Foundation. All rights reserved.