Drasi Sources SDK

news/2025/4/19 2:48:04/文章来源:https://www.cnblogs.com/shanyou/p/18717918

什么是Drasi数据源(Source)?

Source提供了与系统的连接,Drasi 可以将这些系统视为变化源。source 在 Drasi 中执行三个重要功能:

  • 处理源系统生成的更改日志/源,并将这些更改推送到使用该源作为输入的每个连续查询。
  • 将源更改数据转换为一致的属性图数据模型,以便订阅的 Continuous Queries 可以使用该数据,就像使用 Nodes 和 Relations 的图形一样。对于图形源(如 Gremlin),无需转换。但对于非图形源,例如 PostgreSQL 和 Kubernetes,Source 会转换数据(更多详细信息在各个 Sources 部分中提供)。
  • 为 Continuous Queries 提供一种方法,以便在启动时查询源系统以初始化 Continuous Query 结果的状态。
端到端

Drasi Sources SDK 是一个用于实现 Drasi 数据源的多语言开发工具包,目前支持 Java、.NET、Rust 等编程语言。这个 SDK 的主要目的是帮助开发者创建和管理 Drasi 平台的数据源。扩展Drasi Sources的文档参见 https://github.com/drasi-project/docs/blob/main/docs/content/how-to-guides/extend-drasi/implement-a-source.md

每个数据源由两个核心部分组成:

  1. 两个主要组件:

    a) Source Reactivator(数据源响应器):负责监控和处理数据变化

    • 监听源数据存储的变更流
    • 将数据转换为图结构
    • 将变更推送到持续查询系统
    • 支持状态存储,用于保存游标等信息

    b) Source Proxy(数据源代理):负责初始数据的获取和加载 :

    • 处理新的持续查询的初始化
    • 通过查询数据存储获取初始状态
    • 将数据转换为图结构
  2. 支持的数据操作:

    • 节点(Node)的创建和管理
    • 关系(Relation)的创建和管理
    • 属性(Properties)的管理
    • 变更事件的处理
    • 控制事件的处理
  3. 特点和优势
  • 多语言支持:提供 Java、.NET、Rust 等多种语言的实现
  • 异步处理:支持异步流式处理数据变更
  • 状态管理:提供状态存储功能,支持游标管理
  • 配置灵活:支持自定义配置属性
  • 容器化部署:支持容器化部署和管理
  • 事件驱动:基于事件驱动架构处理数据变更

SDK 的设计理念是提供一个统一的接口来实现各种数据源的接入,同时保持足够的灵活性以适应不同的使用场景。无论是简单的数据源还是复杂的数据处理系统,都可以通过这个 SDK 来实现与 Drasi 平台的集成。

实现自定义数据源的步骤

第一步:实现Source Proxy(数据源代理)

Source Proxy主要负责在查询部署时获取初始数据。它需要提供一个HTTP服务器,并实现/acquire接口来处理初始数据的加载。

using System.Runtime.CompilerServices;
using System.Text.Json.Nodes;
using Drasi.Source.SDK;
using Drasi.Source.SDK.Models;
using Microsoft.Extensions.Configuration;

var proxy = new SourceProxyBuilder()
     .UseBootstrapHandler<BootstrapHandler>()
     .Build();

await proxy.StartAsync();


class BootstrapHandler : IBootstrapHandler
{
     public BootstrapHandler(IConfiguration configuration)
     {
         Console.WriteLine($"Connection string: {configuration["connectionString"]}");
     }

    public async IAsyncEnumerable<SourceElement> Bootstrap(BootstrapRequest request, [EnumeratorCancellation]CancellationToken cancellationToken = default)
     {
         if (request.NodeLabels.Contains("Person"))
         {

     
             yield return new SourceElement("person-1", ["Person"], new JsonObject
             {
                 { "name", "Alice" },
                 { "age", 30 }
             });

            yield return new SourceElement("person-2", ["Person"], new JsonObject
             {
                 { "name", "Bob" },
                 { "age", 40 }
             });
         }

        if (request.RelationLabels.Contains("Knows"))
         {
             yield return new SourceElement("1-2", ["Knows"], new JsonObject
             {
                 { "since", 2010 }
             }, "person-1", "person-2");
         }
     }
}


数据模型

  1. SourceElement 类

    • 支持节点和关系数据
    • JSON 属性支持
    • 标签系统

第二步:实现Source Reactivator(数据源响应器)

Source Reactivator负责监控数据变化并通过Dapr的pub/sub(发布/订阅)功能将变化事件发送给其他组件。

数据变化事件格式

所有的数据变化事件都需要包含三个必须字段:

  • op:操作类型
  • payload:数据负载
  • ts_ms:时间戳(毫秒)

1. 新增数据事件格式

{"op": "i",  // i 表示 insert(插入)"payload": {"after": {"id": "001","labels": ["用户", "VIP"],"properties": {"name": "张三","age": 30}},"before": {},  // 新增时before为空"source": {"table": "node",  // node表示节点,relation表示关系"ts_ms": "1676908799000"}},"ts_ms": 1676908799000
}

2. 更新数据事件格式

{"op": "u",  // u 表示 update(更新)"payload": {"after": {"id": "001","labels": ["用户", "VIP"],"properties": {"name": "张三","age": 31}},"before": {"id": "001","labels": ["用户", "VIP"],"properties": {"name": "张三","age": 30}},"source": {"table": "node","ts_ms": "1676908799000"}},"ts_ms": 1676908799000
}

3. 删除数据事件格式

{"op": "d",  // d 表示 delete(删除)"payload": {"after": {},  // 删除时after为空"before": {"id": "001","labels": ["用户", "VIP"],"properties": {"name": "张三","age": 31}},"source": {"table": "node","ts_ms": "1676908799000"}},"ts_ms": 1676908799000
}

注册你的数据源

创建SourceProvider配置

要注册新的数据源类型,你需要创建一个SourceProvider配置文件。这个配置描述了数据源的组件和配置选项。

apiVersion: v1
kind: SourceProvider
name: MySource
spec:
   services:
     proxy:
       image: my-proxy
       externalImage: true
       dapr:
         app-port: "80"
     reactivator:
       image: my-reactivator
       externalImage: true
       deprovisionHandler: true
       dapr:
         app-port: "80"
   config_schema:
     type: object
     properties:
       connectionString:  # sample config property
         type: string

使用数据源

创建Source配置文件来使用已注册的数据源:

apiVersion: v1
kind: Source
name: test-source
spec:
   kind: MySource
   properties:
     connectionString: "my-connection-string"

部署和管理命令

# 注册数据源提供者
drasi apply -f source-provider.yaml# 查看所有可用的数据源类型
drasi list sourceprovider# 部署具体的数据源实例
drasi apply -f source.yaml

调试和验证

  1. 配置文件验证

    • 使用Drasi CLI:drasi apply --dry-run -f your-source.yaml
    • 使用VSCode插件:安装Drasi VSCode扩展,可以自动验证配置文件
  2. 常见问题排查

    • 确保Docker镜像已正确推送到镜像仓库
    • 检查服务端口配置是否正确
    • 验证数据库连接信息
    • 查看容器日志排查问题
  3. 最佳实践

    • 在开发环境中充分测试
    • 使用环境变量管理敏感信息
    • 实现健康检查接口
    • 添加详细的日志记录

常见问题

  1. Q: 如何确保数据源的安全性? A: 使用环境变量存储敏感信息,启用SSL连接,实施适当的访问控制。

  2. Q: 数据源支持哪些类型的数据变化监控? A: 支持新增(insert)、更新(update)和删除(delete)三种基本操作的监控。

  3. Q: 如何处理大量数据的初始加载? A: 考虑使用分页加载,实现断点续传,或者使用批量处理机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.hqwc.cn/news/884646.html

如若内容造成侵权/违法违规/事实不符,请联系编程知识网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【每日一题】20250216

人生就是这样,不经历鲜血淋漓的疼痛,就不会明白那些曾经让我们厌烦的说教其实是受用一生的信条。【每日一题】(多选)如图所示,在竖直半面内有一半径为 \(R\) 的圆弧轨道.半径 \(OA\) 木平、\(OB\) 竖直,一个质量为 \(m\) 的小球自 \(A\) 的正上方 \(P\) 点由静止开始自由…

250216 ABC393总结

省流:还是绿@ 说实话状态不是很好,困了zzz。 A 第一眼愣了一下,然后秒了。小学脑筋急转弯读本上会看到的。 B 第一眼愣了一下,然后秒了。然后发现题读假了。遂重写。太慢了! 还死了一发,因为找 j 的时候循环顺序错了。但凡我是醒着的就不会这样。 C 第一眼愣了一下,然后…

数据团队必读:智能数据分析文档(DataV Note)五种高效工作模式

数据项目,无论是数据分析、可视化,还是数据科学和机器学习相关的项目,通常都非常复杂,涉及多个组成部分,比如代码、数据、运行环境、SQL脚本以及分析报告等;与此同时,随着AI时代的到来,数据科学领域正经历重大变革。这对于数据科学团队来说,如何保持高效地工作模式一直…

ModbusTCP服务器类库

首先,对网络通信有一定了解,但Modbus协议的具体细节可能不太熟悉。Modbus TCP服务器需要处理多个客户端连接,正确解析请求并返回响应。因此,我需要确保服务器能够并发处理多个连接,同时正确实现Modbus协议的各种功能码。接下来,回顾之前的客户端实现,客户端主要负责构建…

2025.2.15 test

这把爆零了。主要是因为 T1 妈的充要条件是错的,然后 T2 这个“切糕”唐氏玩意忘记了,T3 没时间写了。 A 有两个长度为 \(n,m\) 的 \(1,-1\) 序列,有些位置未知,你要求方案数,使得从 \((0,0)\) 开始无论如何都能走到 \((n,m)\),且中途的和 \(\ge 0\)。有一个条件,若当前…

ClientAliveCountMax设置0可以吗

在网络运维与开发领域,"clientalivecountmax"这一术语扮演着关键角色,它涉及到客户端与服务器交互的管理层面。客户端,即通过网络接口与服务器建立通信的应用程序,而服务器则负责响应这些请求并提供所需服务。"clientalivecountmax"实质上并非直接指示…

2.14 2.15

区间奇数数目 找规律 工资平均值 遍历即可 柠檬水找零 无脑ifelse,贪心? 三角形最大周长 排序后贪心 缀点成线 避免除法,用乘法成本较低 二进制求和 py自带进制转换函数 逢二进一

nginx allow什么意思?

在Nginx的配置体系中,allow指令扮演着至关重要的角色,它作为网络安全策略的一部分,能够精准地调控对特定资源的访问权限。此指令协同 deny命令,共同构建了一套高效且灵活的访问控制机制,确保只有被授权的客户端能够触及指定内容,从而强化服务器的安全防线,抵御潜在的未授…