Accès aux données à l’aide de Spark dans Data Science Workspace

Le document suivant contient des exemples d’accès aux données à l’aide de Spark pour une utilisation dans Data Science Workspace. Pour plus d’informations sur l’accès aux données à l’aide de notebooks JupyterLab, consultez la page Accès aux données des notebooks JupyterLab documentation.

Prise en main

Utilisation Spark nécessite des optimisations de performances qui doivent être ajoutées à la variable SparkSession. En outre, vous pouvez également configurer configProperties pour plus tard lire et écrire dans des jeux de données.

import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}

Class Helper {

 /**
   *
   * @param configProperties - Configuration Properties map
   * @param sparkSession     - SparkSession
   * @return                 - DataFrame which is loaded for training
   */

   def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
            // Read the configs
            val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
            val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
            val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
            val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

   }
}

Lecture d’un jeu de données

Lorsque vous utilisez Spark, vous avez accès à deux modes de lecture : interactive et par lots.

Le mode interactif crée une connexion Java Database Connectivity (JDBC) à Query Service et obtient des résultats par le biais d’un JDBC standard ResultSet qui est automatiquement traduit en un DataFrame. Ce mode fonctionne de la même manière que le mode intégré Spark method spark.read.jdbc(). Ce mode est destiné uniquement aux petits jeux de données. Si votre jeu de données dépasse 5 millions de lignes, il est conseillé de passer en mode batch.

Le mode par lot utilise Query ServiceCommande COPY de pour générer les jeux de résultats Parquet dans un emplacement partagé. Ces fichiers Parquet peuvent ensuite être traités plus en détail.

Vous trouverez ci-dessous un exemple de lecture d’un jeu de données en mode interactif :

  // Read the configs
    val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
    val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
    val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
    val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

 val dataSetId: String = configProperties.get(taskId).getOrElse("")

    // Load the dataset
    var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "interactive")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df
  }

De même, vous trouverez ci-dessous un exemple de lecture d’un jeu de données en mode batch :

val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.mode, "batch")
      .option(QSOption.datasetId, dataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .load()
    df.show()
    df

SÉLECTIONNER des colonnes du jeu de données

df = df.select("column-a", "column-b").show()

Clause DISTINCT

La clause DISTINCT vous permet de récupérer toutes les valeurs distinctes au niveau d’une ligne/colonne, supprimant toutes les valeurs en double de la réponse.

Exemple d’utilisation de la méthode distinct() est visible ci-dessous :

df = df.select("column-a", "column-b").distinct().show()

Clause WHERE

Le Spark Le SDK permet deux méthodes de filtrage : Utilisation d’une expression SQL ou filtrage par conditions.

Vous trouverez ci-dessous un exemple d’utilisation de ces fonctions de filtrage :

Expression SQL

df.where("age > 15")

Critères de filtrage

df.where("age" > 15 || "name" = "Steve")

Clause ORDER BY

La clause ORDER BY permet de trier les résultats reçus par une colonne spécifiée dans un ordre spécifique (croissant ou décroissant). Dans le Spark Pour ce faire, utilisez le SDK sort() fonction .

Exemple d’utilisation de la méthode sort() est visible ci-dessous :

df = df.sort($"column1", $"column2".desc)

Clause LIMIT

La clause LIMIT vous permet de limiter le nombre d’enregistrements reçus du jeu de données.

Exemple d’utilisation de la méthode limit() est visible ci-dessous :

df = df.limit(100)

Écriture dans un jeu de données

En utilisant configProperties mappage, vous pouvez écrire sur un jeu de données dans Experience Platform à l’aide de QSOption.

val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString

    df.write.format(PLATFORM_SDK_PQS_PACKAGE)
      .option(QSOption.userToken, userToken)
      .option(QSOption.imsOrg, orgId)
      .option(QSOption.apiKey, apiKey)
      .option(QSOption.datasetId, scoringResultsDataSetId)
      .option(QSOption.sandboxName, sandboxName)
      .save()

Étapes suivantes

Adobe Experience Platform Data Science Workspace fournit un exemple de recette Scala (Spark) qui utilise les exemples de code ci-dessus pour lire et écrire des données. Si vous souhaitez en savoir plus sur l’utilisation de Spark pour accéder à vos données, veuillez consulter la section Référentiel GitHub de Data Science Workspace.

recommendation-more-help
cc79fe26-64da-411e-a6b9-5b650f53e4e9