Accès aux données à l’aide de Spark dans Data Science Workspace
Le document suivant contient des exemples d’accès aux données à l’aide de Spark pour une utilisation dans Data Science Workspace. Pour plus d’informations sur l’accès aux données à l’aide de notebooks JupyterLab, consultez la page Accès aux données des notebooks JupyterLab documentation.
Prise en main
Utilisation Spark nécessite des optimisations de performances qui doivent être ajoutées à la variable SparkSession
. En outre, vous pouvez également configurer configProperties
pour plus tard lire et écrire dans des jeux de données.
import com.adobe.platform.ml.config.ConfigProperties
import com.adobe.platform.query.QSOption
import org.apache.spark.sql.{DataFrame, SparkSession}
Class Helper {
/**
*
* @param configProperties - Configuration Properties map
* @param sparkSession - SparkSession
* @return - DataFrame which is loaded for training
*/
def load_dataset(configProperties: ConfigProperties, sparkSession: SparkSession, taskId: String): DataFrame = {
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
}
}
Lecture d’un jeu de données
Lorsque vous utilisez Spark, vous avez accès à deux modes de lecture : interactive et par lots.
Le mode interactif crée une connexion Java Database Connectivity (JDBC) à Query Service et obtient des résultats par le biais d’un JDBC standard ResultSet
qui est automatiquement traduit en un DataFrame
. Ce mode fonctionne de la même manière que le mode intégré Spark method spark.read.jdbc()
. Ce mode est destiné uniquement aux petits jeux de données. Si votre jeu de données dépasse 5 millions de lignes, il est conseillé de passer en mode batch.
Le mode par lot utilise Query ServiceCommande COPY de pour générer les jeux de résultats Parquet dans un emplacement partagé. Ces fichiers Parquet peuvent ensuite être traités plus en détail.
Vous trouverez ci-dessous un exemple de lecture d’un jeu de données en mode interactif :
// Read the configs
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
val dataSetId: String = configProperties.get(taskId).getOrElse("")
// Load the dataset
var df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "interactive")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
}
De même, vous trouverez ci-dessous un exemple de lecture d’un jeu de données en mode batch :
val df = sparkSession.read.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.mode, "batch")
.option(QSOption.datasetId, dataSetId)
.option(QSOption.sandboxName, sandboxName)
.load()
df.show()
df
SÉLECTIONNER des colonnes du jeu de données
df = df.select("column-a", "column-b").show()
Clause DISTINCT
La clause DISTINCT vous permet de récupérer toutes les valeurs distinctes au niveau d’une ligne/colonne, supprimant toutes les valeurs en double de la réponse.
Exemple d’utilisation de la méthode distinct()
est visible ci-dessous :
df = df.select("column-a", "column-b").distinct().show()
Clause WHERE
Le Spark Le SDK permet deux méthodes de filtrage : Utilisation d’une expression SQL ou filtrage par conditions.
Vous trouverez ci-dessous un exemple d’utilisation de ces fonctions de filtrage :
Expression SQL
df.where("age > 15")
Critères de filtrage
df.where("age" > 15 || "name" = "Steve")
Clause ORDER BY
La clause ORDER BY permet de trier les résultats reçus par une colonne spécifiée dans un ordre spécifique (croissant ou décroissant). Dans le Spark Pour ce faire, utilisez le SDK sort()
fonction .
Exemple d’utilisation de la méthode sort()
est visible ci-dessous :
df = df.sort($"column1", $"column2".desc)
Clause LIMIT
La clause LIMIT vous permet de limiter le nombre d’enregistrements reçus du jeu de données.
Exemple d’utilisation de la méthode limit()
est visible ci-dessous :
df = df.limit(100)
Écriture dans un jeu de données
En utilisant configProperties
mappage, vous pouvez écrire sur un jeu de données dans Experience Platform à l’aide de QSOption
.
val userToken: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_TOKEN", "").toString
val orgId: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_ORG_ID", "").toString
val apiKey: String = sparkSession.sparkContext.getConf.get("ML_FRAMEWORK_IMS_CLIENT_ID", "").toString
val sandboxName: String = sparkSession.sparkContext.getConf.get("sandboxName", "").toString
df.write.format(PLATFORM_SDK_PQS_PACKAGE)
.option(QSOption.userToken, userToken)
.option(QSOption.imsOrg, orgId)
.option(QSOption.apiKey, apiKey)
.option(QSOption.datasetId, scoringResultsDataSetId)
.option(QSOption.sandboxName, sandboxName)
.save()
Étapes suivantes
Adobe Experience Platform Data Science Workspace fournit un exemple de recette Scala (Spark) qui utilise les exemples de code ci-dessus pour lire et écrire des données. Si vous souhaitez en savoir plus sur l’utilisation de Spark pour accéder à vos données, veuillez consulter la section Référentiel GitHub de Data Science Workspace.