Show Menu
THEMEN×

SDK-Entwicklerhandbuch

Das Model Authoring SDK ermöglicht Ihnen die Entwicklung von benutzerdefinierten maschinellen Lernrezepten und Feature Pipelines, die im Data Science Workspace verwendet werden können. Es bietet implementierbare Vorlagen in PySpark und Spark (Scala). Adobe Experience Platform
Dieses Dokument enthält Informationen zu den verschiedenen Klassen im Model Authoring-SDK.

DataLoader

Die DataLoader-Klasse enthält alle Daten zum Abrufen, Filtern und Zurückgeben von Rohdaten. Beispiele für Eingabedaten sind Schulungsdaten, Bewertungsdaten oder Funktionstechniken. Datenlader erweitern die abstrakte Klasse DataLoader und müssen die abstrakte Methode überschreiben load .
PySpark
In der folgenden Tabelle werden die abstrakten Methoden einer PySpark Data Loader-Klasse beschrieben:
Methode und Beschreibung Parameter
load(self, configProperties, spark)
Plattformdaten als Pandas-Datenrahmen laden und zurückgeben
  • self : Eigene Referenz
  • configProperties : Konfigurationseigenschaften-Map
  • spark : Spark-Sitzung
Spark
In der folgenden Tabelle werden die abstrakten Methoden einer Spark Data Loader-Klasse beschrieben:
Methode und Beschreibung Parameter
load(configProperties, sparkSession)
Plattformdaten als DataFrame laden und zurückgeben
  • configProperties : Konfigurationseigenschaften-Map
  • sparkSession : Spark-Sitzung

Daten aus einem Plattformdataset laden

Im folgenden Beispiel werden Plattformdaten nach ID abgerufen und ein DataFrame zurückgegeben, wobei die DataSet-ID ( datasetId ) eine definierte Eigenschaft in der Konfigurationsdatei ist.
PySpark
# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

Spark (Scala)
// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver

Die DataSaver-Klasse kapselt alles, was mit dem Speichern von Ausgabedaten zu tun hat, einschließlich Daten aus der Scoring- oder Funktionstechnik. Datensparer erweitern die abstrakte Klasse DataSaver und müssen die abstrakte Methode überschreiben save .
PySpark
In der folgenden Tabelle werden die abstrakten Methoden einer PySpark Data Saver-Klasse beschrieben:
Methode und Beschreibung Parameter
save(self, configProperties, dataframe)
Ausgabedaten als DataFrame empfangen und in einem Platform-Datensatz speichern
  • self : Eigene Referenz
  • configProperties : Konfigurationseigenschaften-Map
  • dataframe : Daten, die in Form eines DataFrame gespeichert werden sollen
Spark (Scala)
In der folgenden Tabelle werden die abstrakten Methoden einer Spark Data Saver-Klasse beschrieben:
Methode und Beschreibung Parameter
save(configProperties, dataFrame)
Ausgabedaten als DataFrame empfangen und in einem Platform-Datensatz speichern
  • configProperties : Konfigurationseigenschaften-Map
  • dataFrame : Daten, die in Form eines DataFrame gespeichert werden sollen

Daten in einem Platform-Datensatz speichern

Um Daten in einem Platform-Dataset zu speichern, müssen die Eigenschaften entweder bereitgestellt oder in der Konfigurationsdatei definiert werden:
  • Eine gültige Plattform-Datensatz-ID, in der Daten gespeichert werden
  • Die Mandanten-ID in Ihrer Organisation
Die folgenden Beispiele speichern Daten ( prediction ) in einem Platform-Dataset, wobei die Dataset-ID ( datasetId ) und die Mandant-ID ( tenantId ) Eigenschaften in der Konfigurationsdatei sind.
PySpark
# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")
        
        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)
        
        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

Spark (Scala)
// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */
    
  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

DatasetTransformer

Die DatasetTransformer-Klasse ändert und transformiert die Struktur eines Datensatzes. Die Sensei Machine Learning Runtime erfordert keine Definition dieser Komponente und wird entsprechend Ihren Anforderungen implementiert.
Im Hinblick auf eine Feature-Pipeline können DataSet-Transformer gemeinsam mit einer Feature-Pipeline-Fabrik zur Vorbereitung von Daten für die Funktionstechnik eingesetzt werden.
PySpark
In der folgenden Tabelle werden die Klassenmethoden einer PySpark-DataSet-Transformer-Klasse beschrieben:
Methode und Beschreibung Parameter
abstract transform(self, configProperties, dataset)
Übernimmt einen Datensatz als Eingabe und Ausgabe eines neuen abgeleiteten Datensatzes
  • self : Eigene Referenz
  • configProperties : Konfigurationseigenschaften-Map
  • dataset : Das Eingabedataset für die Transformation
Spark (Scala)
Die folgende Tabelle beschreibt die abstrakten Methoden einer Spark-DataSet-Transformer-Klasse:
Methode und Beschreibung Parameter
transform(configProperties, dataset)
Übernimmt einen Datensatz als Eingabe und Ausgabe eines neuen abgeleiteten Datensatzes
  • configProperties : Konfigurationseigenschaften-Map
  • dataset : Das Eingabedataset für die Transformation

FeaturePipelineFactory

Die FeaturePipelineFactory-Klasse enthält Algorithmen zur Extraktion von Funktionen und definiert die Phasen einer Feature Pipeline von Beginn zu Ende.
PySpark
In der folgenden Tabelle werden die Klassenmethoden einer PySpark FeaturePipelineFactory beschrieben:
Methode und Beschreibung Parameter
abstract create_pipeline(self, configProperties)
Erstellen und Zurückgeben einer Spark-Pipeline, die eine Reihe von Spark-Transformers enthält
  • self : Eigene Referenz
  • configProperties : Konfigurationseigenschaften-Map
abstract get_param_map(self, configProperties, sparkSession)
Abrufen und Zurückgeben der Parameterzuordnung aus den Konfigurationseigenschaften
  • self : Eigene Referenz
  • configProperties : Konfigurationseigenschaften
  • sparkSession : Spark-Sitzung
Spark (Scala)
In der folgenden Tabelle werden die Klassenmethoden einer Spark FeaturePipelineFactory beschrieben:
Methode und Beschreibung Parameter
abstract createPipeline(configProperties)
Erstellen und Zurückgeben einer Pipeline mit einer Reihe von Transformatoren
  • configProperties : Konfigurationseigenschaften-Map
abstract getParamMap(configProperties, sparkSession)
Abrufen und Zurückgeben der Parameterzuordnung aus den Konfigurationseigenschaften
  • configProperties : Konfigurationseigenschaften
  • sparkSession : Spark-Sitzung

PipelineFactory

Die PipelineFactory-Klasse enthält Methoden und Definitionen für Modellschulungen und -bewertungen, wobei Schulungslogik und -algorithmen in Form einer Spark-Pipeline definiert werden.
PySpark
Die folgende Tabelle beschreibt die Klassenmethoden einer PySpark PipelineFactory:
Methode und Beschreibung Parameter
abstract apply(self, configProperties)
Erstellen und Zurückgeben einer Spark-Pipeline, die die Logik und den Algorithmus für Modellschulungen und -bewertungen enthält
  • self : Eigene Referenz
  • configProperties : Konfigurationseigenschaften
abstract train(self, configProperties, dataframe)
Geben Sie eine benutzerdefinierte Pipeline zurück, die die Logik und den Algorithmus enthält, um ein Modell auszubilden. Diese Methode ist nicht erforderlich, wenn eine Spark-Pipeline verwendet wird
  • self : Eigene Referenz
  • configProperties : Konfigurationseigenschaften
  • dataframe : Funktionsdatensatz für Schulungseingaben
abstract score(self, configProperties, dataframe, model)
Ergebnis mit dem geschulten Modell und Rückgabe der Ergebnisse
  • self : Eigene Referenz
  • configProperties : Konfigurationseigenschaften
  • dataframe : Eingabedatensatz für die Bewertung
  • model : Ein ausgebildetes Modell, das zur Bewertung verwendet wird
abstract get_param_map(self, configProperties, sparkSession)
Abrufen und Zurückgeben der Parameterzuordnung aus den Konfigurationseigenschaften
  • self : Eigene Referenz
  • configProperties : Konfigurationseigenschaften
  • sparkSession : Spark-Sitzung
Spark (Scala)
In der folgenden Tabelle werden die Klassenmethoden einer Spark PipelineFactory beschrieben:
Methode und Beschreibung Parameter
abstract apply(configProperties)
Erstellen und Zurückgeben einer Pipeline, die die Logik und den Algorithmus für Modellschulungen und -bewertungen enthält
  • configProperties : Konfigurationseigenschaften
abstract getParamMap(configProperties, sparkSession)
Abrufen und Zurückgeben der Parameterzuordnung aus den Konfigurationseigenschaften
  • configProperties : Konfigurationseigenschaften
  • sparkSession : Spark-Sitzung

MLEvaluator

Die MLEvaluator-Klasse stellt Methoden zum Definieren von Bewertungsmetriken und zum Festlegen von Schulungs- und Testdatasets bereit.
PySpark
In der folgenden Tabelle werden die Klassenmethoden eines PySpark MLEvaluators beschrieben:
Methode und Beschreibung Parameter
abstract split(self, configProperties, dataframe)
Teilt den Eingabedatensatz in die Untergruppen "Schulung"und "Test".
  • self : Eigene Referenz
  • configProperties : Konfigurationseigenschaften
  • dataframe : Zu teilendes Eingabedataset
abstract evaluate(self, dataframe, model, configProperties)
Wertet ein ausgebildetes Modell aus und gibt die Bewertungsergebnisse zurück
  • self : Eigene Referenz
  • dataframe : Ein DataFrame, bestehend aus Schulungs- und Testdaten
  • model : Ein geschultes Modell
  • configProperties : Konfigurationseigenschaften
Spark (Scala)
In der folgenden Tabelle werden die Klassenmethoden eines Spark MLEvaluators beschrieben:
Methode und Beschreibung Parameter
abstract split(configProperties, data)
Teilt den Eingabedatensatz in die Untergruppen "Schulung"und "Test".
  • configProperties : Konfigurationseigenschaften
  • data : Zu teilendes Eingabedataset
abstract evaluate(configProperties, model, data)
Wertet ein ausgebildetes Modell aus und gibt die Bewertungsergebnisse zurück
  • configProperties : Konfigurationseigenschaften
  • model : Ein geschultes Modell
  • data : Ein DataFrame, bestehend aus Schulungs- und Testdaten