Flow Service API を使用したクラウドストレージソースのデータフローの作成
このチュートリアルでは、クラウドストレージソースからデータを取得し、Flow Service API を使用して Platform に取り込む手順について説明します。
はじめに
このチュートリアルは、Adobe Experience Platform の次のコンポーネントを実際に利用および理解しているユーザーを対象としています。
-
Experience Data Model (XDM) System:Experience Platform が顧客体験データを整理するための標準的なフレームワーク。
- スキーマ構成の基本:スキーマ構成の主要な原則やベストプラクティスなど、XDM スキーマの基本的な構成要素について説明します。
- スキーマレジストリ開発者ガイドには、Schema Registry API の呼び出しを正常に実行するために知っておくべき重要な情報が含まれています。これには、
{TENANT_ID}
、「コンテナ」の概念、リクエストを行うのに必要なヘッダー(Accept ヘッダーと使用可能な値には特に注意を払う)が含まれます。
-
Catalog Service:カタログは、 Experience Platform 内のデータの位置と系統を記録するシステムです。
-
Batch ingestion:Batch Ingestion API を使用すると、データをバッチファイルとして Experience Platform に取り込むことができます。
-
サンドボックス:Experience Platform は、単一の Platform インスタンスを個別の仮想環境に分割する仮想サンドボックスを提供し、デジタル体験アプリケーションの開発および進化を支援します。
Platform API の使用
Platform API を正常に呼び出す方法については詳しくは、Platform API の概要のガイドを参照してください。
ソース接続の作成 source
ソース接続を作成するには、 sourceConnections
の終点 Flow Service API は、ベース接続 ID、取り込むソースファイルへのパス、およびソースの対応する接続仕様 ID を提供します。
ソース接続を作成する場合は、データ形式属性の enum 値も定義する必要があります。
ファイルベースのソースには、次の列挙値を使用します。
delimited
json
parquet
すべてのテーブルベースのソースで、値を tabular
に設定します。
API 形式
POST /sourceConnections
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited",
"properties": {
"columnDelimiter": "{COLUMN_DELIMITER}",
"encoding": "{ENCODING}",
"compressionType": "{COMPRESSION_TYPE}"
}
},
"params": {
"path": "/acme/summerCampaign/account.csv",
"type": "file"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
baseConnectionId
data.format
delimited
, JSON
、および parquet
.data.properties
data.properties.columnDelimiter
,
) がデフォルト値として使用されます。 注意: columnDelimiter
プロパティは、区切り文字付きファイルを取り込む場合にのみ使用できます。data.properties.encoding
UTF-8
および ISO-8859-1
. 注意: encoding
パラメーターは、区切られた CSV ファイルを取り込む場合にのみ使用できます。 その他のファイルタイプは、デフォルトのエンコーディングで取り込まれ、 UTF-8
.data.properties.compressionType
bzip2
, gzip
, deflate
, zipDeflate
, tarGzip
、および tar
. 注意: compressionType
プロパティは、区切り文字付きまたは JSON ファイルを取り込む場合にのみ使用できます。params.path
/acme/summerCampaign/*.csv
は全体を取り込みます /acme/summerCampaign/
フォルダー。params.type
file
個々のファイルを取り込んでタイプを使用するには folder
フォルダー全体を取り込みます。connectionSpec.id
応答
リクエストが成功した場合は、新たに作成されたソース接続の一意の ID(id
)が返されます。この ID は、後の手順でデータフローを作成する際に必要になります。
{
"id": "26b53912-1005-49f0-b539-12100559f0e2",
"etag": "\"11004d97-0000-0200-0000-5f3c3b140000\""
}
正規表現を使用して、取り込むファイルの特定のセットを選択する regex
正規表現を使用して、ソース接続の作成時に、ソースから Platform に特定のファイルセットを取り込むことができます。
API 形式
POST /sourceConnections
リクエスト
以下の例では、ファイルパスで正規表現を使用して、 premium
名前に。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/*premium*.csv",
"type": "folder"
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
データを再帰的に取り込むためのソース接続の設定
ソース接続を作成する際に、 recursive
深くネストされたフォルダーからデータを取り込むためのパラメーター。
API 形式
POST /sourceConnections
リクエスト
次の例では、 recursive: true
パラメーター通知 Flow Service をクリックして、取り込みプロセス中にサブフォルダーをすべて再帰的に読み込みます。
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage source connection",
"description: "Source connection for a cloud storage source with recursive ingestion",
"baseConnectionId": "1f164d1b-debe-4b39-b4a9-df767f7d6f7c",
"data": {
"format": "delimited"
},
"params": {
"path": "/acme/summerCampaign/customers/premium/buyers/recursive",
"type": "folder",
"recursive": true
},
"connectionSpec": {
"id": "4c10e202-c428-4796-9208-5f1f5732b1cf",
"version": "1.0"
}
}'
ターゲット XDM スキーマの作成 target-schema
ソースデータを Platform で使用するには、必要に応じてターゲットスキーマを作成してソースデータを構造化する必要があります。 次に、ターゲットスキーマを使用して、ソースデータが含まれる Platform データセットを作成します。
Schema Registry API に POST リクエストを実行することで、ターゲット XDM スキーマを作成できます。
ターゲット XDM スキーマの作成手順について詳しくは、 API を使用したスキーマの作成に関するチュートリアルを参照してください。
ターゲットデータセットの作成 target-dataset
Catalog Service API に POST リクエストを実行し、その際にペイロード内でターゲットスキーマの ID を指定することで、ターゲットデータセットを作成できます。
ターゲットデータセットの作成手順について詳しくは、 API を使用したデータセットの作成に関するチュートリアルを参照してください。
ターゲット接続の作成 target-connection
ターゲット接続は、取り込まれたデータが取り込まれる宛先への接続を表します。 ターゲット接続を作成するには、データレイクに関連付けられた固定接続仕様 ID を提供する必要があります。この接続仕様 ID は c604ff05-7f1a-43c0-8e18-33bf874cb11c
です。
これで、ターゲットスキーマとターゲットデータセット、およびデータレイクへの接続仕様 ID の一意の識別子が得られました。これらの識別子を使用すると、受信ソースデータを格納するデータセットを指定する Flow Service API を使用して、ターゲット接続を作成することができます。
API 形式
POST /targetConnections
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Target Connection for a Cloud Storage connector",
"description": "Target Connection for a Cloud Storage connector",
"data": {
"schema": {
"id": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"version": "application/vnd.adobe.xed-full+json;version=1"
}
},
"params": {
"dataSetId": "5f3c3cedb2805c194ff0b69a"
},
"connectionSpec": {
"id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
"version": "1.0"
}
}'
data.schema.id
$id
。data.schema.version
application/vnd.adobe.xed-full+json;version=1
に設定する必要があります。params.dataSetId
connectionSpec.id
c604ff05-7f1a-43c0-8e18-33bf874cb11c
です。応答
リクエストが成功した場合は、新しいターゲット接続の一意の ID(id
)が返されます。この ID は、後の手順で必要になります。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
マッピングの作成 mapping
ソースデータをターゲットデータセットに取り込むには、まず、ターゲットデータセットが準拠するターゲットスキーマにマッピングする必要があります。
マッピングセットを作成するには、ターゲット XDM スキーマ $id
および作成するマッピングセットの詳細を入力する際に、Data Prep API の mappingSets
エンドポイントに POST リクエストを行います。
API 形式
POST /conversion/mappingSets
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
-H 'Authorization: Bearer {ACCESS_TOKEN}' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"version": 0,
"xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/995dabbea86d58e346ff91bd8aa741a9f36f29b1019138d4",
"xdmVersion": "1.0",
"id": null,
"mappings": [
{
"destinationXdmPath": "_id",
"sourceAttribute": "Id",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.firstName",
"sourceAttribute": "FirstName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
},
{
"destinationXdmPath": "person.name.lastName",
"sourceAttribute": "LastName",
"identity": false,
"identityGroup": null,
"namespaceCode": null,
"version": 0
}
]
}'
xdmSchema
応答
リクエストが成功した場合は、一意の ID(id
)を含む、新しく作成されたマッピングの詳細が返されます。この値は、後の手順でデータフローを作成する際に必要になります。
{
"id": "bf5286a9c1ad4266baca76ba3adc9366",
"version": 0,
"createdDate": 1597784069368,
"modifiedDate": 1597784069368,
"createdBy": "{CREATED_BY}",
"modifiedBy": "{MODIFIED_BY}"
}
データフロー仕様の取得 specs
データフローは、ソースからデータを収集し、それらを Platform に取り込む役割を果たします。 データフローを作成するには、まず、クラウドストレージデータの収集を実行するデータフロー仕様を取得する必要があります。
API 形式
GET /flowSpecs?property=name=="CloudStorageToAEP"
リクエスト
curl -X GET \
'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name==%22CloudStorageToAEP%22' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}'
応答
応答が成功すると、ソースから Platform にデータを取り込む必要があるデータフローの仕様の詳細が返されます。応答には、新しいデータフローを作成するために必要な、一意のフロー仕様 id
が含まれます。
code language-json |
---|
|
データフローの作成
クラウドストレージデータを収集するための最後の手順は、データフローを作成することです。現時点で、次の必要な値の準備ができています。
データフローは、ソースからデータをスケジュールおよび収集する役割を果たします。ペイロードに前述の値を提供しながら POST リクエストを実行することで、データフローを作成することができます。
取り込みをスケジュールするには、まず開始時刻の値をエポック時間(秒)に設定する必要があります。次に、頻度の値を次の 5 つのオプションのいずれかに設定する必要があります。once
、minute
、hour
、day
または week
。インターバルの値には、連続した 2 回の取り込みの間の期間を指定します。1 回限りの取り込みを作成する場合は、インターバルを設定する必要はありません。その他のすべての頻度では、間隔の値を 15
以上に設定する必要があります。
API 形式
POST /flows
リクエスト
curl -X POST \
'https://platform.adobe.io/data/foundation/flowservice/flows' \
-H 'x-api-key: {API_KEY}' \
-H 'x-gw-ims-org-id: {ORG_ID}' \
-H 'x-sandbox-name: {SANDBOX_NAME}' \
-H 'Content-Type: application/json' \
-d '{
"name": "Cloud Storage flow to Platform",
"description": "Cloud Storage flow to Platform",
"flowSpec": {
"id": "9753525b-82c7-4dce-8a9b-5ccfce2b9876",
"version": "1.0"
},
"sourceConnectionIds": [
"26b53912-1005-49f0-b539-12100559f0e2"
],
"targetConnectionIds": [
"f7eb08fa-5f04-4e45-ab08-fa5f046e45ee"
],
"transformations": [
{
"name": "Mapping",
"params": {
"mappingId": "bf5286a9c1ad4266baca76ba3adc9366",
"mappingVersion": 0
}
}
],
"scheduleParams": {
"startTime": "1597784298",
"frequency":"minute",
"interval":"30"
}
}'
flowSpec.id
sourceConnectionIds
targetConnectionIds
transformations.params.mappingId
scheduleParams.startTime
scheduleParams.frequency
once
、minute
、hour
、day
、week
です。scheduleParams.interval
once
に設定されている場合、間隔は必須ではありません。また、頻度は他の頻度の値に対して、15
よりも大きいか、等しい必要があります。応答
リクエストが成功した場合は、新しく作成したデータフローの ID(id
)が返されます。
{
"id": "dbc5c132-bc2a-4625-85c1-32bc2a262558",
"etag": "\"8e000533-0000-0200-0000-5f3c40fd0000\""
}
データフローの監視
データフローが作成されると、それを通して取り込まれるデータを監視し、フローの実行状況、完了状況、エラーなどの情報を確認することができます。データフローのモニタリング方法について詳しくは、API でのデータフローの監視のチュートリアルを参照してください。
次の手順
このチュートリアルでは、ソースコネクタを作成し、スケジュールに従ってクラウドストレージからデータを収集しました。受信データは、Real-Time Customer Profile および Data Science Workspace のようなダウンストリームの Platform サービスで使用できるようになりました。詳しくは、次のドキュメントを参照してください。
付録 appendix
次の節では、様々なクラウドストレージソースコネクタとその接続仕様を示します。
接続仕様
ecadc60c-7455-4d87-84dc-2a0e293d997b
86043421-563b-46ec-8e6c-e23184711bf6
4c10e202-c428-4796-9208-5f1f5732b1cf
b3ba5556-48be-44b7-8b85-ff2b69b46dc4
bf9f5905-92b7-48bf-bf20-455bc6b60a4e
be5ec48c-5b78-49d5-b8fa-7c89ec4569b8
32e8f412-cdf7-464c-9885-78184cb113fd
54e221aa-d342-4707-bcff-7a4bceef0001
c85f9425-fb21-426c-ad0b-405e9bd8a46c
bf367b0d-3d9b-4060-b67b-0d3d9bd06094