Show Menu
主题×

通过源连接器和API从第三方数据库收集数据

Flow Service 用于收集和集中Adobe Experience Platform内不同来源的客户数据。 该服务提供用户界面和RESTful API,所有支持的源都可从中连接。
本教程介绍从第三方数据库检索数据并通过源连接 Platform 器和流服 # API将其引入(https://www.adobe.io/apis/experienceplatform/home/api-reference.html#!acpdr/swagger-specs/flow-service.yaml) 的步骤。

入门指南

本教程要求您具有与第三方数据库的有效连接,以及要引入的文件(包括文 Platform 件的路径和结构)的相关信息。 如果您没有此信息,请在尝试本教程 之前参阅教程,了解如何使用流服务 API浏览数据库。
本教程还要求您对Adobe Experience Platform的以下组件有充分的了解:
  • 体验数据模型(XDM)系统 :Experience Platform组织客户体验数据的标准化框架。
    • 模式合成基础 :了解XDM模式的基本构件,包括模式构成的主要原则和最佳做法。
    • 模式注册开发人员指南 :包括成功执行对模式注册表API的调用时需要了解的重要信息。 这包括您 {TENANT_ID} 的、“容器”的概念以及发出请求所需的标题(特别要注意“接受”标题及其可能的值)。
  • 目录服务 :目录是数据位置和谱系的记录系统 Experience Platform。
  • 批量摄取 :批处理摄取API允许您将数据作为批 Experience Platform 处理文件收录。
  • 沙箱 : Experience Platform 提供将单个实例分为单独的虚 Platform 拟环境的虚拟沙箱,以帮助开发和发展数字体验应用程序。
以下各节提供了使用流服务 API成功连接到第三方数据库 时需要了解的其他信息

读取示例API调用

本教程提供示例API调用,以演示如何设置请求的格式。 这包括路径、必需的标头和格式正确的请求负载。 还提供API响应中返回的示例JSON。 有关示例API调用文档中使用的惯例的信息,请参阅疑难解答 指南中有关如何阅读示例API调 用 Experience Platform 一节。

收集所需标题的值

要调用API,您必 Platform 须先完成身份验证 教程 。 完成身份验证教程可为所有API调用中的每个所需 Experience Platform 标头提供值,如下所示:
  • Authorization: Bearer {ACCESS_TOKEN}
  • x-api-key: {API_KEY}
  • x-gw-ims-org-id: {IMS_ORG}
中的所有资 Experience Platform源(包括属于这些资 Flow Service源)都与特定虚拟沙箱隔离。 对API的 Platform 所有请求都需要一个标头,它指定操作将在中进行的沙箱的名称:
  • x-sandbox-name: {SANDBOX_NAME}
所有包含有效负荷(POST、PUT、PATCH)的请求都需要额外的媒体类型标头:
  • Content-Type: application/json

创建源连接

您可以通过向API发出POST请求来创建源 Flow Service 连接。 源连接由连接ID、源数据文件的路径和连接规范ID组成。
要创建源连接,还必须为数据格式属性定义枚举值。
对基于文件的连接器使 用以下枚举值 :
Data.format
枚举值
分隔文件
delimited
JSON文件
json
镶木文件
parquet
对于所 有基于表的连接器 ,请使用枚举值: tabular .
API格式
POST /sourceConnections

请求
curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/sourceConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Database Source Connector",
        "baseConnectionId": "d5cbb5bc-44cc-41a2-8bb5-bc44ccf1a2fb",
        "description": "A test source connector for a third-party database",
        "data": {
            "format": "tabular",
        },
        "params": {
            "path": "ADMIN.E2E"
        },
        "connectionSpec": {
            "id": "d6b52d86-f0f8-475f-89d4-ce54c8527328",
            "version": "1.0"
        }
    }'

属性
描述
baseConnectionId
第三方数据库源的连接ID。
params.path
源文件的路径。
connectionSpec.id
第三方数据库源的连接规范ID。 有关列表 库规 范ID的,请参见附录。
响应
成功的响应会返回新创建的源 id 连接的唯一标识符()。 此ID是后续步骤中创建目标连接时必需的。
{
    "id": "2f7356d9-a866-47ea-b356-d9a86687ea7a",
    "etag": "\"c8006055-0000-0200-0000-5ecd79520000\""
}

创建目标XDM模式

在前面的步骤中,创建了一个专门的XDM模式来构造源数据。 为了在中使用源数据,还必 Platform须创建目标模式,以根据您的需要构建源数据。 然后,目标模式用于创建包含 Platform 源数据的数据集。 此目标XDM模式还扩展 XDM Individual Profile 类。
通过对目标注册表API执行POST请求,可以创 建模式XDM模式 。 如果您希望在中使用用户界 Experience Platform面, 模式编辑器教程 提供了在模式编辑器中执行类似操作的分步说明。
API格式
POST /tenant/schemas

请求
以下示例请求创建一个扩展XDM类的XDM模式 Individual Profile 符。
curl -X POST \
    'https://platform.adobe.io/data/foundation/schemaregistry/tenant/schemas' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "type": "object",
        "title": "Database Source Connector Target Schema",
        "description": "Target schema for a third-party database",
        "allOf": [
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-person-details"
            },
            {
                "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details"
            }
        ],
        "meta:containerId": "tenant",
        "meta:resourceType": "schemas",
        "meta:xdmType": "object",
        "meta:class": "https://ns.adobe.com/xdm/context/profile"
    }'

响应
成功的响应会返回新创建模式的详细信息,包括其唯一标识符( $id )。 后续步骤中需要此ID才能创建目标数据集、映射和数据流。
{
    "$id": "https://ns.adobe.com/{TENANT_ID}/schemas/c44dd18673370dbf16243ba6e6fd9ae62c7916ec10477727",
    "meta:altId": "_{TENANT_ID}.schemas.c44dd18673370dbf16243ba6e6fd9ae62c7916ec10477727",
    "meta:resourceType": "schemas",
    "version": "1.0",
    "title": "Target schema for an Oracle connector 5/26/20",
    "type": "object",
    "description": "Target schema for Database",
    "allOf": [
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-person-details",
            "type": "object",
            "meta:xdmType": "object"
        },
        {
            "$ref": "https://ns.adobe.com/xdm/context/profile-personal-details",
            "type": "object",
            "meta:xdmType": "object"
        }
    ],
    "refs": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "imsOrg": "{IMS_ORG}",
    "meta:extensible": false,
    "meta:abstract": false,
    "meta:extends": [
        "https://ns.adobe.com/xdm/context/profile-person-details",
        "https://ns.adobe.com/xdm/context/profile-personal-details",
        "https://ns.adobe.com/xdm/common/auditable",
        "https://ns.adobe.com/xdm/data/record",
        "https://ns.adobe.com/xdm/context/profile"
    ],
    "meta:xdmType": "object",
    "meta:registryMetadata": {
        "repo:createdDate": 1590523478581,
        "repo:lastModifiedDate": 1590523478581,
        "xdm:createdClientId": "{CREATED_CLIENT_ID}",
        "xdm:lastModifiedClientId": "{LAST_MODIFIED_CLIENT_ID}",
        "xdm:createdUserId": "{CREATED_USER_ID}",
        "xdm:lastModifiedUserId": "{LAST_MODIFIED_USER_ID}",
        "eTag": "34fdf36fc3029999a07270c4e7719d8a627f7e93e2fbc13888b3c11fb08983c0",
        "meta:globalLibVersion": "1.10.2.1"
    },
    "meta:class": "https://ns.adobe.com/xdm/context/profile",
    "meta:containerId": "tenant",
    "meta:tenantNamespace": "_{TENANT_ID}"
}

创建目标数据集

通过向Catalog Service API执行目标请求 ,提供有效负荷中POST模式的ID ,可以创建目标数据集。
API格式
POST /dataSets

请求
curl -X POST \
    'https://platform.adobe.io/data/foundation/catalog/dataSets?requestDataSource=true' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Target dataset for a third-party database source connector",
        "schemaRef": {
            "id": "https://ns.adobe.com/{TENANT_ID}/schemas/c44dd18673370dbf16243ba6e6fd9ae62c7916ec10477727",
            "contentType": "application/vnd.adobe.xed-full-notext+json; version=1"
        }
    }'

属性
描述
schemaRef.id
目标XDM模式的ID。
响应
成功的响应会返回一个数组,其中包含格式为新创建数据集的ID "@/datasets/{DATASET_ID}" 。 数据集ID是由系统生成的只读字符串,用于在API调用中引用数据集。 按照后续步骤中的要求存储目标数据集ID,以创建目标连接和数据流。
[
    "@/dataSets/5ecd766e4bab17191b78e892"
]

创建目标连接

您现在具有数据集基础连接、目标模式和目标数据集的唯一标识符。 使用这些标识符,您可以使用 ! 流服务 API创建目标连接 ,以指定将包含入站源数据的数据集。
API格式
POST /targetConnections

请求
curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/targetConnections' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Target Connection for a third-party database source connector",
        "description": "Target Connection for a third-party database source connector",
        "data": {
            "schema": {
                "id": "https://ns.adobe.com/{TENANT_ID}/schemas/c44dd18673370dbf16243ba6e6fd9ae62c7916ec10477727",
                "version": "application/vnd.adobe.xed-full+json;version=1.0"
            }
        },
        "params": {
            "dataSetId": "5ecd766e4bab17191b78e892"
        },
            "connectionSpec": {
            "id": "c604ff05-7f1a-43c0-8e18-33bf874cb11c",
            "version": "1.0"
        }
    }'

属性
描述
data.schema.id
目标 $id XDM模式。
params.dataSetId
在上一步收集的目标数据集的ID。
connectionSpec.id
数据湖的固定连接规范ID。 此连接规范ID为: c604ff05-7f1a-43c0-8e18-33bf874cb11c .
响应
成功的响应会返回新目标连接的唯一标识符( id )。 此值是后续步骤中创建数据流的必需值。
{
    "id": "e66fdb22-06df-48ac-afdb-2206dff8ac10",
    "etag": "\"7e03773a-0000-0200-0000-5ecd768d0000\""
}

创建映射

为了将源数据引入目标数据集,必须首先将其映射到目标数据集所附加的目标模式。 这是通过对API执行POST请求而实现的,该请 Conversion Service 求具有在请求有效负荷中定义的数据映射。
API格式
POST /mappingSets

请求
curl -X POST \
    'https://platform.adobe.io/data/foundation/conversion/mappingSets' \
    -H 'Authorization: Bearer {ACCESS_TOKEN}' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "version": 0,
        "xdmSchema": "https://ns.adobe.com/{TENANT_ID}/schemas/c44dd18673370dbf16243ba6e6fd9ae62c7916ec10477727",
        "xdmVersion": "1.0",
        "id": null,
        "mappings": [
            {
                "destinationXdmPath": "person.name.fullName",
                "sourceAttribute": "NAME",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "_repo.createDate",
                "sourceAttribute": "DOB",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            },
            {
                "destinationXdmPath": "_id",
                "sourceAttribute": "ID",
                "identity": false,
                "identityGroup": null,
                "namespaceCode": null,
                "version": 0
            }
        ]
    }'

属性
描述
xdmSchema
目标 $id XDM模式。
响应
成功的响应会返回新创建的映射的详细信息,包括其唯一标识符( id )。 此ID是后续步骤中创建数据流的必需ID。
{
    "id": "d9d94124417d4df48ea3d00e28eb4327",
    "version": 0,
    "createdDate": 1590523552440,
    "modifiedDate": 1590523552440,
    "createdBy": "{CREATED_BY}",
    "modifiedBy": "{MODIFIED_BY}"
}

检索数据流规范

数据流负责从源收集数据并将其引入 Platform。 要创建数据流,必须首先对流服务 API执行GET请 求,以获得数据流规范 。 数据流规范负责从外部数据库或NoSQL系统收集数据。
API格式
GET /flowSpecs?property=name=="CRMToAEP"

请求
curl -X GET \
    'https://platform.adobe.io/data/foundation/flowservice/flowSpecs?property=name=="CRMToAEP"' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}'

响应
成功的响应会返回数据流规范的详细信息,该规范负责将数据从数据库或NoSQL系统引入 Platform。 下一步中需要此ID才能创建新数据流。
{
    "items": [
        {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "name": "CRMToAEP",
            "providerId": "0ed90a81-07f4-4586-8190-b40eccef1c5a",
            "version": "1.0",
            "transformationSpecs": [
                {
                    "name": "Copy",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "properties": {
                            "deltaColumn": {
                                "type": "object",
                                "properties": {
                                    "name": {
                                        "type": "string"
                                    },
                                    "dateFormat": {
                                        "type": "string"
                                    },
                                    "timezone": {
                                        "type": "string"
                                    }
                                },
                                "required": [
                                    "name"
                                ]
                            }
                        },
                        "required": [
                            "deltaColumn"
                        ]
                    }
                },
                {
                    "name": "Mapping",
                    "spec": {
                        "$schema": "http://json-schema.org/draft-07/schema#",
                        "type": "object",
                        "description": "defines various params required for different mapping from source to target",
                        "properties": {
                            "mappingId": {
                                "type": "string"
                            },
                            "mappingVersion": {
                                "type": "string"
                            }
                        }
                    }
                }
            ],
            "scheduleSpec": {
                "name": "PeriodicSchedule",
                "type": "Periodic",
                "spec": {
                    "$schema": "http://json-schema.org/draft-07/schema#",
                    "type": "object",
                    "properties": {
                        "startTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "endTime": {
                            "description": "epoch time",
                            "type": "integer"
                        },
                        "interval": {
                            "type": "integer"
                        },
                        "frequency": {
                            "type": "string",
                            "enum": [
                                "minute",
                                "hour",
                                "day",
                                "week"
                            ]
                        },
                        "backfill": {
                            "type": "boolean",
                            "default": true
                        }
                    },
                    "required": [
                        "startTime",
                        "frequency",
                        "interval"
                    ],
                    "if": {
                        "properties": {
                            "frequency": {
                                "const": "minute"
                            }
                        }
                    },
                    "then": {
                        "properties": {
                            "interval": {
                                "minimum": 15
                            }
                        }
                    },
                    "else": {
                        "properties": {
                            "interval": {
                                "minimum": 1
                            }
                        }
                    }
                }
            }
        }
    ]
}

创建数据流

收集数据的最后一步是创建数据流。 此时,您应准备以下必需值:
数据流负责从源调度和收集数据。 通过在有效负荷中提供先前提到的值时执行POST请求,可以创建数据流。
要计划摄取,您必须首先将开始时间值设置为纪元时间(以秒为单位)。 然后,您必须将频率值设置为以下五个选项之一: once minute hour day week 。 间隔值指定两个连续摄取之间的周期,并且创建一次摄取不需要设置间隔。 对于所有其他频率,间隔值必须设置为等于或大于 15
API格式
POST /flows

请求
curl -X POST \
    'https://platform.adobe.io/data/foundation/flowservice/flows' \
    -H 'x-api-key: {API_KEY}' \
    -H 'x-gw-ims-org-id: {IMS_ORG}' \
    -H 'x-sandbox-name: {SANDBOX_NAME}' \
    -H 'Content-Type: application/json' \
    -d '{
        "name": "Dataflow for a third-party database and Platform,
        "description": "collecting ADMIN.E2E",
        "flowSpec": {
            "id": "14518937-270c-4525-bdec-c2ba7cce3860",
            "version": "1.0"
        },
        "sourceConnectionIds": [
            "89cf81c9-47b4-463a-8f81-c947b4863afb"
        ],
        "targetConnectionIds": [
            "e66fdb22-06df-48ac-afdb-2206dff8ac10"
        ],
        "transformations": [
            {
                "name": "Copy",
                "params": {
                    "deltaColumn": {
                        "name": "updatedAt",
                        "dateFormat": "YYYY-MM-DD",
                        "timezone": "UTC"
                    }
                }
            },
            {
                "name": "Mapping",
                "params": {
                    "mappingId": "d9d94124417d4df48ea3d00e28eb4327",
                    "mappingVersion": "0"
                }
            }
        ],
        "scheduleParams": {
            "startTime": "1590523836",
            "frequency":"minute",
            "interval":"15",
            "backfill": "true"
        }
    }'

属性
描述
flowSpec.id
在上 一步中检索 的流程规范ID。
sourceConnectionIds
先前步骤中检索 的源连接ID。
targetConnectionIds
先前步骤中检索 的目标连接ID。
transformations.params.mappingId
先前步骤 中检索的映射ID。
transformations.params.deltaColum
用于区分新数据和现有数据的指定列。 增量数据将根据所选列的时间戳被摄取。 支持的日期格 deltaColumn yyyy-MM-dd HH:mm:ss 为。 如果您使用Azure表存储,则支持的格 deltaColumn 式为 yyyy-MM-ddTHH:mm:ssZ
transformations.params.mappingId
与数据库关联的映射ID。
scheduleParams.startTime
开始时间中数据流的数据时间。
scheduleParams.frequency
数据流收集数据的频率。 可接受的值包括: once minute hour day week
scheduleParams.interval
该间隔指定两个连续流运行之间的周期。 间隔的值应为非零整数。 当频率设置为时,间隔不 once 是必需的,对于其他频率值, 15 应大于或等于。
响应
成功的响应会返回新创 id 建的数据流的ID()。
{
    "id": "e0bd8463-0913-4ca1-bd84-6309134ca1f6",
    "etag": "\"04004fe9-0000-0200-0000-5ebc4c8b0000\""
}

监视数据流

创建数据流后,您可以监视通过它摄取的数据,以查看有关流运行、完成状态和错误的信息。 有关如何监视数据流的详细信息,请参阅API中 的数据流监视教程

后续步骤

按照本教程,您已创建了一个源连接器,以按计划从第三方数据库收集数据。 现在,下游服务(如和)可 Platform 以使用传入 Real-time Customer Profile 数据 Data Science Workspace。 有关更多详细信息,请参阅以下文档:

附录

以下部分列表不同的云存储源连接器及其连接规范。

连接规范

连接器名称
连接规范ID
Amazon Redshift
3416976c-a9ca-4bba-901a-1f08f66978ff
Apache Hive on Azure HDInsights
aac9bbd4-6c01-46ce-b47e-51c6f0f6db3f
Apache Spark on Azure HDInsights
6a8d82bc-1caf-45d1-908d-cadabc9d63a6
Azure Data Explorer
0479cc14-7651-4354-b233-7480606c2ac3
Azure Synapse Analytics
a49bcc7d-8038-43af-b1e4-5a7a089a7d79
Azure Table Storage
ecde33f2-c56f-46cc-bdea-ad151c16cd69
CouchBase
1fe283f6-9bec-11ea-bb37-0242ac130002
Google BigQuery
3c9b37f8-13a6-43d8-bad3-b863b941fedd
IBM DB2
09182899-b429-40c9-a15a-bf3ddbc8ced7
MariaDB
000eb99-cd47-43f3-827c-43caf170f015
Microsoft SQL Server
1f372ff9-38a4-4492-96f5-b9a4e4bd00ec
MySQL
26d738e0-8963-47ea-aadf-c60de735468a
Oracle
d6b52d86-f0f8-475f-89d4-ce54c8527328
Phoenix
102706fb-a5cd-42ee-afe0-bc42f017ff43
PostgreSQL
74a1c565-4e59-48d7-9d67-7c03b8a13137