Show Menu
トピック×

SDK 開発者ガイド

The Model Authoring SDK enables you to develop custom machine learning Recipes and Feature Pipelines which can be used in Adobe Experience Platform Data Science Workspace, providing implementable templates in PySpark and Spark (Scala).
このドキュメントは、モデルオーサリングSDK内の様々なクラスに関する情報を提供します。

DataLoader

DataLoader クラスは、生の入力データを取得、フィルタリング、返す操作に関連するすべてをカプセル化します。入力データの例としては、トレーニング、スコアリング、特徴エンジニアリングなどがあります。データローダーは DataLoader 抽象クラスを拡張し、 load 抽象メソッドをオーバーライドする必要があります。
PySpark
次の表に、PySpark データローダークラスの抽象メソッドを示します。
メソッドと説明 パラメーター
load(self, configProperties, spark)
Pandas DataFrame として Platform データを読み込み、返します。
  • self :自己参照
  • configProperties :設定プロパティのマップ
  • spark :Spark セッション
Spark
The following table describes the abstract methods of a Spark Data Loader class:
メソッドと説明 パラメーター
load(configProperties, sparkSession)
Platform データを DataFrame として読み込み、返します。
  • configProperties :設定プロパティのマップ
  • sparkSession :Spark セッション

Load data from a Platform dataset

The following example retrieves Platform data by ID and returns a DataFrame, where the dataset ID ( datasetId ) is a defined property in the configuration file.
PySpark
# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

Spark (Scala)
// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver

DataSaver クラスは、スコアリングや特徴エンジニアリングの出力データなど、出力データの格納に関連するものをカプセル化します。データセーバーは DataSaver 抽象クラスを拡張し、 save 抽象メソッドを上書きする必要があります。
PySpark
The following table describes the abstract methods of a PySpark Data Saver class:
メソッドと説明 パラメーター
save(self, configProperties, dataframe)
出力データを DataFrame として受け取り、Platform データセットに保存します。
  • self :自己参照
  • configProperties :設定プロパティのマップ
  • dataframe :DataFrame の形式で保存するデータ
Spark (Scala)
The following table describes the abstract methods of a Spark Data Saver class:
メソッドと説明 パラメーター
save(configProperties, dataFrame)
出力データを DataFrame として受け取り、Platform データセットに保存します。
  • configProperties :設定プロパティのマップ
  • dataFrame :DataFrame の形式で保存するデータ

Save data to a Platform dataset

In order to store data onto a Platform dataset, the properties must be either provided or defined in the configuration file:
  • A valid Platform dataset ID to which data will be stored
  • 組織に属するテナント ID
次の例では、データ( prediction Platform)を データセットに格納します。データセット ID( datasetId )とテナント ID( tenantId )は、設定ファイル内で定義されたプロパティです。
PySpark
# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")
        
        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)
        
        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

Spark (Scala)
// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */
    
  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

DatasetTransformer

DatasetTransformer クラスは、データセットの構造を変更および変換します。The Sensei Machine Learning Runtime does not require this component to be defined, and is implemented based on your requirements.
特徴パイプラインに関しては、データセットトランスフォーマーを特徴パイプラインファクトリと協力して使用し、特徴エンジニアリングのためのデータを準備できます。
PySpark
次の表に、PySpark データセットトランスフォーマークラスのクラスメソッドを示します。
メソッドと説明 パラメーター
abstract transform(self, configProperties, dataset)
データセットを入力として受け取り、新しい派生データセットを出力します。
  • self :自己参照
  • configProperties :設定プロパティのマップ
  • dataset :変換される入力データセット
Spark (Scala)
The following table describes the abstract methods of a Spark dataset transformer class:
メソッドと説明 パラメーター
transform(configProperties, dataset)
データセットを入力として受け取り、新しい派生データセットを出力します。
  • configProperties :設定プロパティのマップ
  • dataset :変換される入力データセット

FeaturePipelineFactory

FeaturePipelineFactory クラスには、特徴抽出アルゴリズムが含まれ、特徴パイプラインのステージを開始から終了まで定義します。
PySpark
次の表に、PySpark FeaturePipelineFactory のクラスメソッドを示します。
メソッドと説明 パラメーター
abstract create_pipeline(self, configProperties)
一連の Spark トランスフォーマーを含む Spark パイプラインを作成して返します。
  • self :自己参照
  • configProperties :設定プロパティのマップ
abstract get_param_map(self, configProperties, sparkSession)
設定プロパティからパラメーターマップを取得して返します。
  • self :自己参照
  • configProperties :設定プロパティ
  • sparkSession :Spark セッション
Spark (Scala)
The following table describes the class methods of a Spark FeaturePipelineFactory:
メソッドと説明 パラメーター
abstract createPipeline(configProperties)
一連のトランスフォーマーを含むパイプラインを作成して返します。
  • configProperties :設定プロパティのマップ
abstract getParamMap(configProperties, sparkSession)
設定プロパティからパラメーターマップを取得して返します。
  • configProperties :設定プロパティ
  • sparkSession :Spark セッション

PipelineFactory

The PipelineFactory class encapsulates methods and definitions for model training and scoring, where training logic and algorithms are defined in the form of a Spark Pipeline.
PySpark
次の表に、PySpark PipelineFactory のクラスメソッドを示します。
メソッドと説明 パラメーター
abstract apply(self, configProperties)
モデルのトレーニングとスコアリングのロジックとアルゴリズムを含む Spark パイプラインを作成して返します。
  • self :自己参照
  • configProperties :設定プロパティ
abstract train(self, configProperties, dataframe)
モデルをトレーニングするロジックとアルゴリズムを含むカスタムパイプラインを返します。Spark パイプラインを使用する場合、このメソッドは不要です。
  • self :自己参照
  • configProperties :設定プロパティ
  • dataframe :トレーニング入力の特徴データセット
abstract score(self, configProperties, dataframe, model)
訓練済みモデルを使用してスコアを付け、結果を返します。
  • self :自己参照
  • configProperties :設定プロパティ
  • dataframe :スコアリング用の入力データセット
  • model :スコアリングに使用される訓練済みモデル
abstract get_param_map(self, configProperties, sparkSession)
設定プロパティからパラメーターマップを取得して返します。
  • self :自己参照
  • configProperties :設定プロパティ
  • sparkSession :Spark セッション
Spark (Scala)
The following table describes the class methods of a Spark PipelineFactory:
メソッドと説明 パラメーター
abstract apply(configProperties)
モデルのトレーニングとスコアリングのロジックとアルゴリズムを含むパイプラインを作成して返します。
  • configProperties :設定プロパティ
abstract getParamMap(configProperties, sparkSession)
設定プロパティからパラメーターマップを取得して返します。
  • configProperties :設定プロパティ
  • sparkSession :Spark セッション

MLEvaluator

MLEvaluator クラスは、評価指標を定義するメソッドと、トレーニングとテストデータセットを決定するためのメソッドを提供します。
PySpark
次の表に、PySpark MLEvaluator のクラスメソッドを示します。
メソッドと説明 パラメーター
abstract split(self, configProperties, dataframe)
入力データセットをトレーニングサブセットとテストサブセットに分割します。
  • self :自己参照
  • configProperties :設定プロパティ
  • dataframe :分割する入力データセット
abstract evaluate(self, dataframe, model, configProperties)
訓練済みモデルを評価し、評価結果を返します。
  • self :自己参照
  • dataframe :トレーニングデータとテストデータから成る DataFrame
  • model :訓練済みモデル
  • configProperties :設定プロパティ
Spark (Scala)
The following table describes the class methods of a Spark MLEvaluator:
メソッドと説明 パラメーター
abstract split(configProperties, data)
入力データセットをトレーニングサブセットとテストサブセットに分割します。
  • configProperties :設定プロパティ
  • data :分割する入力データセット
abstract evaluate(configProperties, model, data)
訓練済みモデルを評価し、評価結果を返します。
  • configProperties :設定プロパティ
  • model :訓練済みモデル
  • data :トレーニングデータとテストデータから成る DataFrame