為通訊協定來源建立資料流使用 Flow Service API

本教學課程涵蓋從通訊協定來源擷取資料,並使用將這些資料帶到Platform的步驟 Flow Service API.

NOTE
為了建立資料流,您必須擁有包含通訊協定來源的有效基底連線ID。 如果您沒有此ID,請參閱 來源概觀 以取得您可以用來建立基礎連線的通訊協定來源清單。

快速入門

本教學課程也要求您實際瞭解下列Adobe Experience Platform元件:

  • Experience Data Model (XDM) System:Experience Platform組織客戶體驗資料的標準化架構。

    • 結構描述組合基本概念:瞭解XDM結構描述的基本建置區塊,包括結構描述組合中的關鍵原則和最佳實務。
    • Schema Registry開發人員指南:包含您需瞭解的重要資訊,才能成功執行對Schema Registry API的呼叫。 這包括您的 {TENANT_ID}、「容器」的概念,以及發出請求所需的標頭(請特別注意Accept標頭及其可能的值)。
  • Catalog Service:目錄是Experience Platform中資料位置和譜系的記錄系統。

  • Batch ingestion:批次擷取API可讓您將資料以批次檔案的形式擷取到Experience Platform中。

  • 沙箱:Experience Platform提供的虛擬沙箱可將單一Platform執行個體分割成個別的虛擬環境,以利開發及改進數位體驗應用程式。

使用平台API

如需如何成功呼叫Platform API的詳細資訊,請參閱以下指南: Platform API快速入門.

建立來源連線 source

您可以透過向以下發出POST請求來建立來源連線: Flow Service API。 來源連線由連線ID、來源資料檔案的路徑以及連線規格ID組成。

若要建立來源連線,您也必須定義資料格式屬性的列舉值。

對檔案型聯結器使用下列列舉值:

資料格式
列舉值
已分隔
delimited
JSON
json
Parquet
parquet

對於所有表式聯結器,將值設定為 tabular.

API格式

POST /sourceConnections

要求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Generic OData source connection",
        "baseConnectionId": "a5c6b647-e784-4b58-86b6-47e784ab580b",
        "description": "Generic OData source connection",
        "data": {
            "format": "tabular",
        },
        "params": {
            "tableName": "Orders",
            "columns": [
                {
                "name": "OrderID",
                "type": "integer",
                "xdm": {
                    "type": "integer",
                    "minimum": -2147483648,
                    "maximum": 2147483647
                }
                },
                {
                    "name": "CustomerID",
                    "type": "string",
                    "xdm": {
                        "type": "string"
                    }
                },
                {
                    "name": "OrderDate",
                    "type": "string",
                    "meta:xdmType": "date-time",
                    "xdm": {
                        "type": "string",
                        "format": "date-time"
                    }
                },
                {
                    "name": "ShippedDate",
                    "type": "string",
                    "meta:xdmType": "date-time",
                    "xdm": {
                        "type": "string",
                        "format": "date-time"
                    }
                }
            ]
        },
        "connectionSpec": {
            "id": "8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
            "version": "1.0"
        }
    }'
屬性
說明
baseConnectionId
您通訊協定應用程式的連線ID
params.path
來源檔案的路徑。
connectionSpec.id
您通訊協定應用程式的連線規格ID。

回應

成功的回應會傳回唯一識別碼(id)。 在後續步驟中,建立目標連線時需要此ID。

{
    "id": "0a768941-ddfb-499d-b689-41ddfbf99db0",
    "etag": "\"8f00753e-0000-0200-0000-5e8a547d0000\""
}

建立目標XDM結構描述 target-schema

為了在Platform中使用來源資料,必須建立目標結構描述,以根據您的需求來建構來源資料。 然後目標結構描述會用來建立包含來源資料的Platform資料集。

您可以透過對以下對象執行POST請求來建立目標XDM結構描述: 結構描述登入API.

如需如何建立目標XDM結構的詳細步驟,請參閱以下教學課程: 使用API建立結構描述.

建立目標資料集 target-dataset

您可以透過對執行POST請求來建立目標資料集 目錄服務API,在裝載中提供目標結構描述的ID。

如需如何建立目標資料集的詳細步驟,請參閱教學課程,位於 使用API建立資料集.

建立目標連線 target-connection

目標連線代表與擷取資料著陸目的地之間的連線。 若要建立目標連線,您必須提供與Data Lake相關聯的固定連線規格ID。 此連線規格ID為: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

您現在具有目標結構描述、目標資料集和到資料湖的連線規格ID的唯一識別碼。 使用 Flow Service API後,您可以指定這些識別碼,並將包含傳入來源資料的資料集,藉此建立目標連線。

API格式

POST /targetConnections

要求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Target Connection for protocols",
        "description": "Target Connection for protocols",
        "data": {
            "format": "parquet_xdm",
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/e669d7aba5a02f294fafb7b269af25f7cd4a66ce59193545",
                "version" "application/vnd.adobe.xed-full+json;version=1"
            }
        },
        "params": {
            "dataSetId": "5e8a55ca53662c18af37a83a"
        },
        "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'
屬性
說明
data.schema.id
$id 目標XDM結構描述的。
params.dataSetId
上一步中產生的目標資料集的ID。 注意:建立目標連線時,您必須提供有效的資料集ID。 無效的資料集ID將會導致錯誤。
connectionSpec.id
用來連線至Data Lake的連線規格ID。 此ID為: c604ff05-7f1a-43c0-8e18-33bf874cb11c.

回應

成功回應會傳回新目標連線的唯一識別碼(id)。 在後續步驟中需要此值,才能建立資料流。

{
    "id": "576d5ecf-f114-4587-ad5e-cff1144587f4",
    "etag": "\"13013506-0000-0200-0000-5e8a56d80000\""
}

建立對應 mapping

為了將來源資料擷取到目標資料集中,必須首先將其對應到目標資料集所堅持的目標結構描述。

若要建立對應集,請向以下發出POST請求: mappingSets 的端點 Data Prep API 提供您的目標XDM結構描述時 $id 以及要建立的對應集詳細資訊。

API格式

POST /mappingSets

要求

curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/e669d7aba5a02f294fafb7b269af25f7cd4a66ce59193545",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "OrderID",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "CustomerID",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "EmployeeID",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "createdByBatchID",
                "sourceAttribute": "OrderDate",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'
屬性
說明
xdmSchema
$id 目標XDM結構描述的。

回應

成功的回應會傳回新建立的對應詳細資訊,包括其唯一識別碼(id)。 在後續步驟中需要此ID才能建立資料流。

{
    "id": "37409d3017e24a3eb4a2dc21020f7a5b",
    "version": 0,
    "createdDate": 1586124873209,
    "modifiedDate": 1586124873209,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

查詢資料流規格 specs

資料流負責從來源收集資料,並將這些資料匯入Platform。 為了建立資料流,您必須先取得負責從通訊協定應用程式收集資料的資料流規格。

API格式

GET /flowSpecs?property=name=="CRMToAEP"

要求

curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

回應

成功的回應會傳回負責將資料從來源帶入Platform的資料流規格的詳細資料。 回應包含唯一的流量規格 id 建立新資料流時需要。

NOTE
為了簡單起見,會隱藏下列JSON回應裝載。 選取「裝載」以檢視回應裝載。
檢視裝載
code language-json
{
  "id": "14518937-270c-4525-bdec-c2ba7cce3860",
  "name": "CRMToAEP",
  "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
  "version": "1.0",
  "attributes": {
    "isSourceFlow": true,
    "flacValidationSupported": true,
    "frequency": "batch",
    "notification": {
      "category": "sources",
      "flowRun": {
        "enabled": true
      }
    }
  },
  "sourceConnectionSpecIds": [
    "3416976c-a9ca-4bba-901a-1f08f66978ff",
    "38ad80fe-8b06-4938-94f4-d4ee80266b07",
    "d771e9c1-4f26-40dc-8617-ce58c4b53702",
    "3c9b37f8-13a6-43d8-bad3-b863b941fedd",
    "cc6a4487-9e91-433e-a3a3-9cf6626c1806",
    "3000eb99-cd47-43f3-827c-43caf170f015",
    "26d738e0-8963-47ea-aadf-c60de735468a",
    "74a1c565-4e59-48d7-9d67-7c03b8a13137",
    "cfc0fee1-7dc0-40ef-b73e-d8b134c436f5",
    "4f63aa36-bd48-4e33-bb83-49fbcd11c708",
    "cb66ab34-8619-49cb-96d1-39b37ede86ea",
    "eb13cb25-47ab-407f-ba89-c0125281c563",
    "1f372ff9-38a4-4492-96f5-b9a4e4bd00ec",
    "37b6bf40-d318-4655-90be-5cd6f65d334b",
    "a49bcc7d-8038-43af-b1e4-5a7a089a7d79",
    "221c7626-58f6-4eec-8ee2-042b0226f03b",
    "a8b6a1a4-5735-42b4-952c-85dce0ac38b5",
    "6a8d82bc-1caf-45d1-908d-cadabc9d63a6",
    "aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f",
    "8e6b41a8-d998-4545-ad7d-c6a9fff406c3",
    "ecde33f2-c56f-46cc-bdea-ad151c16cd69",
    "102706fb-a5cd-42ee-afe0-bc42f017ff43",
    "09182899-b429-40c9-a15a-bf3ddbc8ced7",
    "0479cc14-7651-4354-b233-7480606c2ac3",
    "d6b52d86-f0f8-475f-89d4-ce54c8527328",
    "a8f4d393-1a6b-43f3-931f-91a16ed857f4",
    "1fe283f6-9bec-11ea-bb37-0242ac130002",
    "fcad62f3-09b0-41d3-be11-449d5a621b69",
    "ea1c2a08-b722-11eb-8529-0242ac130003",
    "35d6c4d8-c9a9-11eb-b8bc-0242ac130003",
    "ff4274f2-c9a9-11eb-b8bc-0242ac130003",
    "ba5126ec-c9ac-11eb-b8bc-0242ac130003",
    "b2e08744-4f1a-40ce-af30-7abac3e23cf3",
    "929e4450-0237-4ed2-9404-b7e1e0a00309",
    "2acf109f-9b66-4d5e-bc18-ebb2adcff8d5",
    "2fa8af9c-2d1a-43ea-a253-f00a00c74412"
  ],
  "targetConnectionSpecIds": [
    "c604ff05-7f1a-43c0-8e18-33bf874cb11c"
  ],
  "permissionsInfo": {
    "view": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "read"
        ]
      }
    ],
    "manage": [
      {
        "@type": "lowLevel",
        "name": "EnterpriseSource",
        "permissions": [
          "write"
        ]
      }
    ]
  },
  "optionSpec": {
    "name": "OptionSpec",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "errorDiagnosticsEnabled": {
          "title": "Error diagnostics.",
          "description": "Flag to enable detailed and sample error diagnostics summary.",
          "type": "boolean",
          "default": false
        },
        "partialIngestionPercent": {
          "title": "Partial ingestion threshold.",
          "description": "Percentage which defines the threshold of errors allowed before the run is marked as failed.",
          "type": "number",
          "exclusiveMinimum": 0
        }
      }
    }
  },
  "scheduleSpec": {
    "name": "PeriodicSchedule",
    "type": "Periodic",
    "spec": {
      "$schema": "http://json-schema.org/draft-07/schema#",
      "type": "object",
      "properties": {
        "startTime": {
          "description": "epoch time",
          "type": "integer"
        },
        "frequency": {
          "type": "string",
          "enum": [
            "once",
            "minute",
            "hour",
            "day",
            "week"
          ]
        },
        "interval": {
          "type": "integer"
        },
        "backfill": {
          "type": "boolean",
          "default": true
        }
      },
      "required": [
        "startTime",
        "frequency"
      ],
      "if": {
        "properties": {
          "frequency": {
            "const": "once"
          }
        }
      },
      "then": {
        "allOf": [
          {
            "not": {
              "required": [
                "interval"
              ]
            }
          },
          {
            "not": {
              "required": [
                "backfill"
              ]
            }
          }
        ]
      },
      "else": {
        "required": [
          "interval"
        ],
        "if": {
          "properties": {
            "frequency": {
              "const": "minute"
            }
          }
        },
        "then": {
          "properties": {
            "interval": {
              "minimum": 15
            }
          }
        },
        "else": {
          "properties": {
            "interval": {
              "minimum": 1
            }
          }
        }
      }
    }
  },
  "transformationSpec": [
    {
      "name": "Copy",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "properties": {
          "deltaColumn": {
            "type": "object",
            "properties": {
              "name": {
                "type": "string"
              },
              "dateFormat": {
                "type": "string"
              },
              "timezone": {
                "type": "string"
              }
            },
            "required": [
              "name"
            ]
          }
        },
        "required": [
          "deltaColumn"
        ]
      }
    },
    {
      "name": "Mapping",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for different mapping from source to target",
        "properties": {
          "mappingId": {
            "type": "string"
          },
          "mappingVersion": {
            "type": "string"
          }
        }
      }
    }
  ],
  "runSpec": {
      "name": "ProviderParams",
      "spec": {
        "$schema": "http://json-schema.org/draft-07/schema#",
        "type": "object",
        "description": "defines various params required for creating flow run.",
        "properties": {
          "startTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the run. The value is represented in Unix epoch time."
          },
          "windowStartTime": {
            "type": "integer",
            "description": "An integer that defines the start time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "windowEndTime": {
            "type": "integer",
            "description": "An integer that defines the end time of the window against which data is to be pulled. The value is represented in Unix epoch time."
          },
          "deltaColumn": {
            "type": "object",
            "description": "The delta column is required to partition the data and separate newly ingested data from historic data.",
            "properties": {
              "name": {
                "type": "string"
              },
              "dateFormat": {
                "type": "string"
              },
              "timezone": {
                "type": "string"
              }
            },
            "required": [
              "name"
            ]
          }
        },
        "required": [
          "startTime",
          "windowStartTime",
          "windowEndTime",
          "deltaColumn"
        ]
      }
    }
}

建立資料流

收集資料的最後一步是建立資料流。 此時,您應該已準備下列必要值:

資料流負責從來源排程及收集資料。 您可以執行POST要求,同時在裝載中提供先前提到的值,藉此建立資料流。

若要排程內嵌,您必須先將開始時間值設為以秒為單位的epoch時間。 然後,您必須將頻率值設為五個選項之一: onceminutehourday,或 week. 間隔值會指定兩個連續擷取之間的期間,而建立一次性擷取不需要設定間隔。 對於所有其他頻率,間隔值必須設定為等於或大於 15.

API格式

POST /flows

要求

curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {ORG_ID}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Creating a dataflow for a protocols source",
        "description": "Creating a dataflow for a protocols source",
        "flowSpec": {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "0a768941-ddfb-499d-b689-41ddfbf99db0"
        ],
        "targetConnectionIds": [
            "576d5ecf-f114-4587-ad5e-cff1144587f4"
        ],
        "transformations": [
            {
                "name": "Copy",
                "params": {
                    "deltaColumn": {
                        "name": "updatedAt",
                        "dateFormat": "YYYY-MM-DD",
                        "timezone": "UTC"
                    }
                }
            },
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "7409d3017e24a3eb4a2dc21020f7a5b",
                    "mappingVersion": 0
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1567411548",
            "frequency":"minute",
            "interval":"30"
        }
    }'
屬性
說明
flowSpec.id
流程規格ID 已在上一步驟中擷取。
sourceConnectionIds
來源連線ID 已在先前步驟中擷取。
targetConnectionIds
目標連線ID 已在先前步驟中擷取。
transformations.params.mappingId
對應ID 已在先前步驟中擷取。
transformations.params.deltaColum
用來區分新資料和現有資料的指定欄。 將根據所選欄的時間戳記擷取增量資料。 支援的格式 deltaColumn 使用Generic OData時 yyyy-MM-ddTHH:mm:ssZ.
transformations.params.mappingId
與資料庫關聯的對應ID。
scheduleParams.startTime
資料流的開始時間(以Epoch時間計)。
scheduleParams.frequency
資料流收集資料的頻率。 可接受的值包括: onceminutehourday,或 week.
scheduleParams.interval
間隔會指定兩個連續資料流執行之間的期間。 間隔的值應為非零整數。 當頻率設定為 once 且應大於或等於 15 其他頻率值。

回應

成功的回應會傳回ID id 建立資料流的URL名稱。

{
    "id": "8256cfb4-17e6-432c-a469-6aedafb16cd5",
    "etag": "\"04004fe9-0000-0200-0000-5ebc4c8b0000\""
}

監視資料流

建立資料流後,您可以監視透過該資料流擷取的資料,以檢視有關資料流執行、完成狀態和錯誤的資訊。 如需如何監視資料流的詳細資訊,請參閱以下教學課程: 監視API中的資料流

後續步驟

依照本教學課程中的指示,您已建立來源聯結器,以依排程從通訊協定應用程式收集資料。 傳入資料現在可供下游Platform服務使用,例如 Real-Time Customer Profile 和 Data Science Workspace. 如需更多詳細資訊,請參閱下列檔案:

recommendation-more-help
337b99bb-92fb-42ae-b6b7-c7042161d089