Show Menu
TEMAS×

Guía para desarrolladores de SDK

El SDK de creación de modelos le permite desarrollar fórmulas de aprendizaje automático personalizadas y tuberías de funciones que se pueden utilizar en Adobe Experience Platform Área de trabajo de ciencias de datos, proporcionando plantillas implementables en PySpark y Spark (Scala).
Este documento proporciona información sobre las distintas clases que se encuentran en el SDK de creación de modelos.

DataLoader

La clase DataLoader encapsula todo lo relacionado con la recuperación, filtrado y devolución de datos de entrada sin procesar. Algunos ejemplos de datos de entrada son los de capacitación, puntuación o ingeniería de funciones. Los cargadores de datos extienden la clase abstracta DataLoader y deben anular el método abstracto load .
PySpark
En la tabla siguiente se describen los métodos abstractos de una clase PySpark Data Loader:
Método y descripción Parámetros
load(self, configProperties, spark)
Carga y devolución de datos de plataforma como un DataFrame de Pandas
  • self :: Referencia automática
  • configProperties :: Asignación de propiedades de configuración
  • spark :: Sesión de encendido
Spark
En la tabla siguiente se describen los métodos abstractos de una clase Spark Data Loader:
Método y descripción Parámetros
load(configProperties, sparkSession)
Carga y devolución de datos de plataforma como marco de datos
  • configProperties :: Asignación de propiedades de configuración
  • sparkSession :: Sesión de encendido

Carga de datos desde un Platform conjunto de datos

En el ejemplo siguiente se recuperan Platform los datos por ID y se devuelve un DataFrame, donde el ID del conjunto de datos ( datasetId ) es una propiedad definida en el archivo de configuración.
PySpark
# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

Chispa (Scala)
// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver

La clase DataSaver encapsula todo lo relacionado con el almacenamiento de datos de salida, incluidos los datos de puntuación o ingeniería de características. Los responsables de guardar datos amplían la clase abstracta DataSaver y deben anular el método abstracto save .
PySpark
En la tabla siguiente se describen los métodos abstractos de una clase PySpark Data Saver:
Método y descripción Parámetros
save(self, configProperties, dataframe)
Recibir datos de salida como marco de datos y almacenarlos en un conjunto de datos de plataforma
  • self :: Referencia automática
  • configProperties :: Asignación de propiedades de configuración
  • dataframe :: Datos que se almacenarán en forma de marco de datos
Chispa (Scala)
En la tabla siguiente se describen los métodos abstractos de una clase Spark Data Saver:
Método y descripción Parámetros
save(configProperties, dataFrame)
Recibir datos de salida como marco de datos y almacenarlos en un conjunto de datos de plataforma
  • configProperties :: Asignación de propiedades de configuración
  • dataFrame :: Datos que se almacenarán en forma de marco de datos

Guardar datos en un Platform conjunto de datos

Para almacenar datos en un Platform conjunto de datos, las propiedades deben proporcionarse o definirse en el archivo de configuración:
  • Un ID Platform de conjunto de datos válido al que se almacenarán los datos
  • El ID de inquilino que pertenece a su organización
Los siguientes ejemplos almacenan datos ( prediction ) en un Platform conjunto de datos, donde la ID del conjunto de datos ( datasetId ) y la ID del inquilino ( tenantId ) son propiedades definidas dentro del archivo de configuración.
PySpark
# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")
        
        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)
        
        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

Chispa (Scala)
// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */
    
  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

DatasetTransformer

La clase DatasetTransformer modifica y transforma la estructura de un conjunto de datos. El componente Sensei Machine Learning Runtime no requiere que se defina y se implementa según sus necesidades.
Con respecto a una canalización de funciones, los transformadores de conjuntos de datos se pueden utilizar de manera conjunta con una fábrica de canalizaciones de funciones para preparar datos para la ingeniería de funciones.
PySpark
La siguiente tabla describe los métodos de clase de una clase de transformador de conjuntos de datos PySpark:
Método y descripción Parámetros
abstract transform(self, configProperties, dataset)
Toma un conjunto de datos como entrada y genera un nuevo conjunto de datos derivado
  • self :: Referencia automática
  • configProperties :: Asignación de propiedades de configuración
  • dataset :: El conjunto de datos de entrada para la transformación
Chispa (Scala)
En la tabla siguiente se describen los métodos abstractos de una clase de transformador de Spark conjuntos de datos:
Método y descripción Parámetros
transform(configProperties, dataset)
Toma un conjunto de datos como entrada y genera un nuevo conjunto de datos derivado
  • configProperties :: Asignación de propiedades de configuración
  • dataset :: El conjunto de datos de entrada para la transformación

FeaturePipelineFactory

La clase FeaturePipelineFactory contiene algoritmos de extracción de funciones y define las etapas de una tubería de funciones de inicio a fin.
PySpark
En la tabla siguiente se describen los métodos de clase de PySpark FeaturePipelineFactory:
Método y descripción Parámetros
abstract create_pipeline(self, configProperties)
Crear y devolver una tubería de spark que contenga una serie de transformadores de chispa
  • self :: Referencia automática
  • configProperties :: Asignación de propiedades de configuración
abstract get_param_map(self, configProperties, sparkSession)
Recuperar y devolver mapa de parámetros de las propiedades de configuración
  • self :: Referencia automática
  • configProperties :: Propiedades de configuración
  • sparkSession :: Sesión de encendido
Chispa (Scala)
En la tabla siguiente se describen los métodos de clase de un Spark FeaturePipelineFactory:
Método y descripción Parámetros
abstract createPipeline(configProperties)
Crear y devolver una canalización que contenga una serie de transformadores
  • configProperties :: Asignación de propiedades de configuración
abstract getParamMap(configProperties, sparkSession)
Recuperar y devolver mapa de parámetros de las propiedades de configuración
  • configProperties :: Propiedades de configuración
  • sparkSession :: Sesión de encendido

PipelineFactory

La clase PipelineFactory encapsula métodos y definiciones para la formación y puntuación de modelos, donde la lógica y los algoritmos de formación se definen en forma de Spark Canalización.
PySpark
En la tabla siguiente se describen los métodos de clase de PySpark PipelineFactory:
Método y descripción Parámetros
abstract apply(self, configProperties)
Crear y devolver una tubería de spark que contiene la lógica y el algoritmo para la formación y la puntuación de modelos
  • self :: Referencia automática
  • configProperties :: Propiedades de configuración
abstract train(self, configProperties, dataframe)
Devolver una canalización personalizada que contenga la lógica y el algoritmo para entrenar un modelo. Este método no es necesario si se utiliza una canalización de Spark
  • self :: Referencia automática
  • configProperties :: Propiedades de configuración
  • dataframe :: Conjunto de datos de funciones para entradas de formación
abstract score(self, configProperties, dataframe, model)
Puntuación mediante el modelo entrenado y devolución de los resultados
  • self :: Referencia automática
  • configProperties :: Propiedades de configuración
  • dataframe :: Conjunto de datos de entrada para la puntuación
  • model :: Un modelo entrenado utilizado para la puntuación
abstract get_param_map(self, configProperties, sparkSession)
Recuperar y devolver mapa de parámetros de las propiedades de configuración
  • self :: Referencia automática
  • configProperties :: Propiedades de configuración
  • sparkSession :: Sesión de encendido
Chispa (Scala)
En la tabla siguiente se describen los métodos de clase de un Spark PipelineFactory:
Método y descripción Parámetros
abstract apply(configProperties)
Crear y devolver una canalización que contenga la lógica y el algoritmo para la formación y la puntuación de modelos
  • configProperties :: Propiedades de configuración
abstract getParamMap(configProperties, sparkSession)
Recuperar y devolver mapa de parámetros de las propiedades de configuración
  • configProperties :: Propiedades de configuración
  • sparkSession :: Sesión de encendido

MLEvaluator

La clase MLEvaluator proporciona métodos para definir métricas de evaluación y determinar conjuntos de datos de prueba y formación.
PySpark
En la tabla siguiente se describen los métodos de clase de PySpark MLEvaluator:
Método y descripción Parámetros
abstract split(self, configProperties, dataframe)
Divide el conjunto de datos de entrada en subconjuntos de prueba y formación
  • self :: Referencia automática
  • configProperties :: Propiedades de configuración
  • dataframe :: Conjunto de datos de entrada que se va a dividir
abstract evaluate(self, dataframe, model, configProperties)
Evalúa un modelo capacitado y devuelve los resultados de la evaluación
  • self :: Referencia automática
  • dataframe :: Un marco de datos que consta de datos de prueba y formación
  • model :: Un modelo entrenado
  • configProperties :: Propiedades de configuración
Chispa (Scala)
En la tabla siguiente se describen los métodos de clase de un Spark valor MLE:
Método y descripción Parámetros
abstract split(configProperties, data)
Divide el conjunto de datos de entrada en subconjuntos de prueba y formación
  • configProperties :: Propiedades de configuración
  • data :: Conjunto de datos de entrada que se va a dividir
abstract evaluate(configProperties, model, data)
Evalúa un modelo capacitado y devuelve los resultados de la evaluación
  • configProperties :: Propiedades de configuración
  • model :: Un modelo entrenado
  • data :: Un marco de datos que consta de datos de prueba y formación