Show Menu
TÓPICOS×

Guia do desenvolvedor do SDK

O SDK de criação de modelo permite desenvolver Fórmulas de aprendizado e Pipelines de Recursos personalizados de máquina que podem ser usados na Adobe Experience Platform Data Science Workspace, fornecendo modelos implementáveis em PySpark e Spark (Scala).
Este documento fornece informações sobre as várias classes encontradas no SDK de criação de modelo.

DataLoader

A classe DataLoader encapsula qualquer coisa relacionada à recuperação, filtragem e devolução de dados de entrada brutos. Exemplos de dados de entrada incluem aqueles para treinamento, pontuação ou engenharia de recursos. Os carregadores de dados estendem a classe abstrata DataLoader e devem substituir o método abstrato load .
PySpark
A tabela a seguir descreve os métodos abstratos de uma classe PySpark Data Loader:
Método e descrição Parâmetros
load(self, configProperties, spark)
Carregar e retornar dados do Platform como um DataFrame dos Paindas
  • self : Referência automática
  • configProperties : Mapa de propriedades de configuração
  • spark : Sessão Spark
Faísca
A tabela a seguir descreve os métodos abstratos de uma classe de Spark Carregador de dados:
Método e descrição Parâmetros
load(configProperties, sparkSession)
Carregar e retornar dados do Platform como um DataFrame
  • configProperties : Mapa de propriedades de configuração
  • sparkSession : Sessão Spark

Carregar dados de um Platform conjunto de dados

O exemplo a seguir recupera Platform dados por ID e retorna um DataFrame, onde a ID do conjunto de dados ( datasetId ) é uma propriedade definida no arquivo de configuração.
PySpark
# PySpark

from sdk.data_loader import DataLoader

class MyDataLoader(DataLoader):
    """
    Implementation of DataLoader which loads a DataFrame and prepares data
    """

    def load_dataset(config_properties, spark, task_id):

        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"
        PLATFORM_SDK_PQS_INTERACTIVE = "interactive"

        # prepare variables
        service_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(spark.sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        dataset_id = str(config_properties.get(task_id))

        # validate variables
        for arg in ['service_token', 'user_token', 'org_id', 'dataset_id', 'api_key']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)

        # load dataset through Spark session

        query_options = get_query_options(spark.sparkContext)

        pd = spark.read.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.mode(), PLATFORM_SDK_PQS_INTERACTIVE) \
            .option(query_options.datasetId(), dataset_id) \
            .load()
        pd.show()

        # return as DataFrame
        return pd

Spark (Scala)
// Spark

package com.adobe.platform.ml

import java.time.LocalDateTime

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.{StructType, TimestampType}
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.Column

/**
 * Implementation of DataLoader which loads a DataFrame and prepares data
 */
class MyDataLoader extends DataLoader {

    final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
    final val PLATFORM_SDK_PQS_INTERACTIVE: String = "interactive"
    final val PLATFORM_SDK_PQS_BATCH: String = "batch"

    /**
    *
    * @param configProperties - Configuration Properties map
    * @param sparkSession     - SparkSession
    * @return                 - DataFrame which is loaded for training
    */


  def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {

    require(configProperties != null)
    require(sparkSession != null)

    // Read the configs
    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString

    val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, PLATFORM_SDK_PQS_INTERACTIVE)
      .option(QSOption.datasetId, dataSetId)
      .load()
    df.show()
    df
    }
}

DataSaver

A classe DataSaver encapsula qualquer coisa relacionada ao armazenamento de dados de saída, incluindo os dados de pontuação ou engenharia de recursos. Os servidores de dados estendem a classe abstrata DataSaver e devem substituir o método abstrato save .
PySpark
A tabela a seguir descreve os métodos abstratos de uma classe do PySpark Data Saver:
Método e descrição Parâmetros
save(self, configProperties, dataframe)
Receber dados de saída como um DataFrame e armazená-los em um conjunto de dados Platform
  • self : Referência automática
  • configProperties : Mapa de propriedades de configuração
  • dataframe : Dados a serem armazenados na forma de um DataFrame
Spark (Scala)
A tabela a seguir descreve os métodos abstratos de uma classe do Spark Data Saver:
Método e descrição Parâmetros
save(configProperties, dataFrame)
Receber dados de saída como um DataFrame e armazená-los em um conjunto de dados Platform
  • configProperties : Mapa de propriedades de configuração
  • dataFrame : Dados a serem armazenados na forma de um DataFrame

Salvar dados em um Platform conjunto de dados

Para armazenar dados em um Platform conjunto de dados, as propriedades devem ser fornecidas ou definidas no arquivo de configuração:
  • Uma ID válida do Platform conjunto de dados para a qual os dados serão armazenados
  • A ID do locatário pertencente à sua organização
Os exemplos a seguir armazenam dados ( prediction ) em um Platform conjunto de dados, onde a ID do conjunto de dados ( datasetId ) e a ID do locatário ( tenantId ) são propriedades definidas no arquivo de configuração.
PySpark
# PySpark

from sdk.data_saver import DataSaver
from pyspark.sql.types import StringType, TimestampType
from pyspark.sql.functions import col, lit, struct
from .helper import *


class MyDataSaver(DataSaver):
    """
    Implementation of DataSaver which stores a DataFrame to a Platform dataset
    """

    def save(self, config_properties, prediction):

        # Spark context
        sparkContext = prediction._sc

        # preliminary checks
        if config_properties is None:
            raise ValueError("config_properties parameter is null")
        if prediction is None:
            raise ValueError("prediction parameter is null")
        if sparkContext is None:
            raise ValueError("sparkContext parameter is null")
        
        PLATFORM_SDK_PQS_PACKAGE = "com.adobe.platform.query"

        # prepare variables
        scored_dataset_id = str(config_properties.get("scoringResultsDataSetId"))
        tenant_id = str(config_properties.get("tenant_id"))
        timestamp = "2019-01-01 00:00:00"

        service_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ML_TOKEN"))
        user_token = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_TOKEN"))
        org_id = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_ORG_ID"))
        api_key = str(sparkContext.getConf().get("ML_FRAMEWORK_IMS_CLIENT_ID"))

        # validate variables
       for arg in ['service_token', 'user_token', 'org_id', 'scored_dataset_id', 'api_key', 'tenant_id']:
            if eval(arg) == 'None':
                raise ValueError("%s is empty" % arg)
        
        scored_df = prediction.withColumn("date", col("date").cast(StringType()))
        scored_df = scored_df.withColumn(tenant_id, struct(col("date"), col("store"), col("prediction")))
        scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType()))
        scored_df = scored_df.withColumn("_id", lit("empty"))
        scored_df = scored_df.withColumn("eventType", lit("empty")

        # store data into dataset

        query_options = get_query_options(sparkContext)

        scored_df.select(tenant_id, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE) \
            .option(query_options.userToken(), user_token) \
            .option(query_options.serviceToken(), service_token) \
            .option(query_options.imsOrg(), org_id) \
            .option(query_options.apiKey(), api_key) \
            .option(query_options.datasetId(), scored_dataset_id) \
            .save()

Spark (Scala)
// Spark

package com.adobe.platform.ml

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.ml.impl.Constants
import com.adobe.platform.ml.sdk.DataSaver
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types.TimestampType

/**
 * Implementation of DataSaver which stores a DataFrame to a Platform dataset
 */

class ScoringDataSaver extends DataSaver {

  final val PLATFORM_SDK_PQS_PACKAGE: String = "com.adobe.platform.query"
  final val PLATFORM_SDK_PQS_BATCH: String = "batch"

  /**
    * Method that saves the scoring data into a dataframe
    * @param configProperties  - Configuration Properties map
    * @param dataFrame         - Dataframe with the scoring results
    */
    
  override def save(configProperties: ConfigProperties, dataFrame: DataFrame): Unit =  {

    require(configProperties != null)
    require(dataFrame != null)

    val predictionColumn = configProperties.get(Constants.PREDICTION_COL).getOrElse(Constants.DEFAULT_PREDICTION)
    val sparkSession = dataFrame.sparkSession

    val serviceToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ML_TOKEN", "").toString
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val tenantId:String = configProperties.get("tenantId").getOrElse("")
    val timestamp:String = "2019-01-01 00:00:00"

    val scoringResultsDataSetId: String = configProperties.get("scoringResultsDataSetId").getOrElse("")
    import sparkSession.implicits._

    var df = dataFrame.withColumn("date", $"date".cast("String"))

    var scored_df  = df.withColumn(tenantId, struct(df("date"), df("store"), df(predictionColumn)))
    scored_df = scored_df.withColumn("timestamp", lit(timestamp).cast(TimestampType))
    scored_df = scored_df.withColumn("_id", lit("empty"))
    scored_df = scored_df.withColumn("eventType", lit("empty"))

    scored_df.select(tenantId, "_id", "eventType", "timestamp").write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.serviceToken, serviceToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .save()
    }
}

DatasetTransformer

A classe DatasetTransformer modifica e transforma a estrutura de um conjunto de dados. O componente não Sensei Machine Learning Runtime exige que esse componente seja definido e é implementado com base em seus requisitos.
No que diz respeito a um pipeline de recursos, os transformadores de conjuntos de dados podem ser usados em conjunto com uma fábrica de pipeline de recursos para preparar dados para a engenharia de recursos.
PySpark
A tabela a seguir descreve os métodos de classe de uma classe de transformador de conjunto de dados PySpark:
Método e descrição Parâmetros
resumo transform(self, configProperties, dataset)
Utiliza um conjunto de dados como entrada e saída de um novo conjunto de dados derivado
  • self : Referência automática
  • configProperties : Mapa de propriedades de configuração
  • dataset : O conjunto de dados de entrada para transformação
Spark (Scala)
A tabela a seguir descreve os métodos abstratos de uma classe de transformador de conjunto de Spark dados:
Método e descrição Parâmetros
transform(configProperties, dataset)
Utiliza um conjunto de dados como entrada e saída de um novo conjunto de dados derivado
  • configProperties : Mapa de propriedades de configuração
  • dataset : O conjunto de dados de entrada para transformação

FeaturePipelineFactory

A classe FeaturePipelineFactory contém algoritmos de extração de recursos e define os estágios de um Recurso Pipeline de start a fim.
PySpark
A tabela a seguir descreve os métodos de classe de um FeaturePipelineFactory do PySpark:
Método e descrição Parâmetros
resumo create_pipeline(self, configProperties)
Criar e retornar um pipeline Spark que contenha uma série de Transformadores Spark
  • self : Referência automática
  • configProperties : Mapa de propriedades de configuração
resumo get_param_map(self, configProperties, sparkSession)
Recuperar e retornar mapa de parâmetros das propriedades de configuração
  • self : Referência automática
  • configProperties : Propriedades de configuração
  • sparkSession : Sessão Spark
Spark (Scala)
A tabela a seguir descreve os métodos de classe de um Spark FeaturePipelineFactory:
Método e descrição Parâmetros
resumo createPipeline(configProperties)
Criar e retornar um Pipeline que contenha uma série de Transformers
  • configProperties : Mapa de propriedades de configuração
resumo getParamMap(configProperties, sparkSession)
Recuperar e retornar mapa de parâmetros das propriedades de configuração
  • configProperties : Propriedades de configuração
  • sparkSession : Sessão Spark

PipelineFactory

A classe PipelineFactory encapsula métodos e definições para treinamento e pontuação de modelo, onde lógica e algoritmos de treinamento são definidos na forma de um Spark Pipeline.
PySpark
A tabela a seguir descreve os métodos de classe de um PySpark PipelineFactory:
Método e descrição Parâmetros
resumo apply(self, configProperties)
Criar e Retornar um pipeline de Faísca que contém a lógica e o algoritmo para treinamento e pontuação de modelo
  • self : Referência automática
  • configProperties : Propriedades de configuração
resumo train(self, configProperties, dataframe)
Retorna um Pipeline personalizado que contém a lógica e o algoritmo para treinar um modelo. Este método não é necessário se for usado um Spark Pipeline
  • self : Referência automática
  • configProperties : Propriedades de configuração
  • dataframe : Conjunto de dados de recursos para entrada de treinamento
resumo score(self, configProperties, dataframe, model)
Pontuação usando o modelo treinado e retornando os resultados
  • self : Referência automática
  • configProperties : Propriedades de configuração
  • dataframe : Conjunto de dados de entrada para pontuação
  • model : Um modelo treinado usado para pontuação
resumo get_param_map(self, configProperties, sparkSession)
Recuperar e retornar mapa de parâmetros das propriedades de configuração
  • self : Referência automática
  • configProperties : Propriedades de configuração
  • sparkSession : Sessão Spark
Spark (Scala)
A tabela a seguir descreve os métodos de classe de um Spark PipelineFactory:
Método e descrição Parâmetros
resumo apply(configProperties)
Criar e Retornar um Pipeline que contém a lógica e o algoritmo para treinamento e pontuação do modelo
  • configProperties : Propriedades de configuração
resumo getParamMap(configProperties, sparkSession)
Recuperar e retornar mapa de parâmetros das propriedades de configuração
  • configProperties : Propriedades de configuração
  • sparkSession : Sessão Spark

MLEvaluator

A classe MLEvaluator fornece métodos para definir métricas de avaliação e determinar conjuntos de dados de treinamento e teste.
PySpark
A tabela a seguir descreve os métodos de classe de um PySpark MLEvaluator:
Método e descrição Parâmetros
resumo split(self, configProperties, dataframe)
Divide o conjunto de dados de entrada em subconjuntos de treinamento e teste
  • self : Referência automática
  • configProperties : Propriedades de configuração
  • dataframe : Conjunto de dados de entrada a ser dividido
resumo evaluate(self, dataframe, model, configProperties)
Avalia um modelo treinado e retorna os resultados da avaliação
  • self : Referência automática
  • dataframe : Um DataFrame que consiste em dados de treinamento e teste
  • model : Um modelo treinado
  • configProperties : Propriedades de configuração
Spark (Scala)
A tabela a seguir descreve os métodos de classe de um Spark MLEvaluator:
Método e descrição Parâmetros
resumo split(configProperties, data)
Divide o conjunto de dados de entrada em subconjuntos de treinamento e teste
  • configProperties : Propriedades de configuração
  • data : Conjunto de dados de entrada a ser dividido
resumo evaluate(configProperties, model, data)
Avalia um modelo treinado e retorna os resultados da avaliação
  • configProperties : Propriedades de configuração
  • model : Um modelo treinado
  • data : Um DataFrame que consiste em dados de treinamento e teste