拉链表是数据仓库设计中用来处理数据变化的一种技术,它允许保存历史数据,记录一个事物从开始到当前状态的所有变化信息,可以反映任意时间点数据的状态。本文将为您介绍基于 MaxCompute 引擎在 DataWorks 上实现拉链表 ETL 的案例。

适用场景

在设计数据仓库的数据模型时,拉链存储技术可作为一种解决方案,满足以下需求:

  • 数据量较大。

  • 表中的部分字段被更新。 例如,用户的地址、产品的描述信息、订单的状态和手机号码等。

  • 需要查看某一个时间点或时间段的历史快照信息。 例如,查看某一个订单在某一个历史时间点的状态,或查看某一个用户在过去某段时间内更新过几次等。

  • 变化的比例不大或频率不高。 假设总共有1000万个会员,且每天新增和发生变化的会员只有10万左右,如果每天都在表中保留一份全量,那么每次全量中会保存很多不变的信息,极大地浪费了存储资源。

关键字段介绍

拉链表通常包含以下几个关键字段:

字段 描述
主键 唯一标识每一行数据
业务主键 唯一标识实体的主键。例如,会员ID等
属性字段 需要追踪变化的实体属性。例如,会员昵称、会员手机号等
有效开始日期 数据版本的有效开始时间
有效结束日期 数据版本的有效结束时间。通常用一个极大值“9999-12-31”表示当前数据有效
状态标识符 标识是否为最新版本
版本 标识相同业务主键的不同历史版本号

交易订单拉链表

案例介绍

数据表设计,本案例共设计2张数据表。

  • 交易下单源表:ods_order_d,用于存放从业务库同步而来的每日增量数据。包含如下两个字段:
字段 描述
id 业务主键订单
data_status 追踪变化的订单状态
  • 交易下单事实表(拉链表):dwd_order,用于拉链存储全量有效和失效的数据。包含如下四个字段:
字段 描述
id 业务主键订单ID
data_status 追踪变化的订单状态
start_date 有效开始日期
end_date 有效结束时间

拉链表实现逻辑。

  • 本案例将使用拉链表记录电商订单从开始到当前状态(创建/支付/完成)的所有变化信息。

任务开发:增量表数据准备

在数据准备环节,需要创建 ODPS SQL 临时查询,用于产出交易下单源表数据。

ods_order_di

  • 表数据:交易下单源表,包含业务主键订单 ID 和需要追踪变化的订单状态 data_status 字段,用于存放每日新增交易下单数据。

  • ods_order_di 节点编辑页面,需要将以下测试数据,存储在 ods_order_di 交易下单源表中。

    id gmt_create   gmt_modified   data_status  pt
------ ----------   -------------- ------------ ----------
210001 2023-10-04   2023-10-04     创建         2023-10-04
210002 2023-10-04   2023-10-04     创建         2023-10-04
210001 2023-10-04   2023-10-05     支付         2023-10-05
210003 2023-10-05   2023-10-05     创建         2023-10-05
210004 2023-10-05   2023-10-05     创建         2023-10-05
210001 2023-10-04   2023-10-06     完成         2023-10-06
210002 2023-10-04   2023-10-06     支付         2023-10-06
210004 2023-10-05   2023-10-06     支付         2023-10-06
210005 2023-10-06   2023-10-06     创建         2023-10-06
  • 在插入测试数据的时候,需要按照对应的分区(pt)进行补充,操作如下:
-- 1. 创建 ods_order_di 订单源表,存放每日新增交易下单数据。
-- 补充说明:为了便于理解,我们约定案例中的“订单创建时间、修改时间字段”粒度到天级别,且默认一个订单一天仅执行一次操作;
-- 实际业务中一般时间字段粒度更细,并且一天可能有多次操作。
create table if not exists ods_order_di (
    id              bigint    comment '订单id'
    ,gmt_create     date      comment '创建时间,格式为yyyy-mm-dd'
    ,gmt_modified   date      comment '更新时间,格式为yyyy-mm-dd'
    ,data_status    string    comment '状态'
)
comment '交易下单源表'
partitioned by (
    pt  string comment '日期,格式为yyyy-mm-dd'
)
lifecycle 7;


-- 2. 初始化历史交易下单新增数据。
-- 补充说明:由于案例测试需要,我们将3天的每日新增数据一次性分别插入分区'2023-10-04、2023-10-05、2023-10-06',
-- 在实际业务场景中,一般每日更新当前业务日期分区,仅需将分区日期设置成调度参数变量即可。

insert overwrite table ods_order_di partition (pt='2023-10-04') values
        (210001,date'2023-10-04',date'2023-10-04','创建')
        ,(210002,date'2023-10-04',date'2023-10-04','创建');

insert overwrite table ods_order_di partition (pt='2023-10-05') values
        (210001,date'2023-10-04',date'2023-10-05','支付')
        ,(210003,date'2023-10-05',date'2023-10-05','创建')
        ,(210004,date'2023-10-05',date'2023-10-05','创建');

insert overwrite table ods_order_di partition (pt='2023-10-06') values
        (210001,date'2023-10-04',date'2023-10-06','完成')
        ,(210002,date'2023-10-04',date'2023-10-06','支付')
        ,(210004,date'2023-10-05',date'2023-10-06','支付')
        ,(210005,date'2023-10-06',date'2023-10-06','创建');

http://pic.opcoder.cn/MaxCompute-Zipper-Table-02.png

数据开发:拉链表的实现

创建 ODPS SQL 节点:dwd_order,用于产出交易下单事实明细表数据。

  • 表数据:交易下单事实明细表(拉链表),用于存放全量有效数据和全量失效数据。包含有效开始日期 start_date、有效结束时间 end_date、业务主键ID和追踪订单状态变化的 data_status 字段。

  • dwd_order 临时查询页面,输入如下示例代码:

create table if not exists dwd_order (
    id              bigint    comment '订单id'
    ,gmt_create     date      comment '创建时间,格式为yyyy-mm-dd'
    ,gmt_modified   date      comment '更新时间,格式为yyyy-mm-dd'
    ,data_status    string    comment '状态'
    ,start_date     date      comment '生效日期,格式为yyyy-mm-dd'
    ,end_date       date      comment '失效日期,格式为yyyy-mm-dd,9999-12-31表示有效'
)
comment '交易下单事实明细表(拉链表)';
  • dwd_order 节点编辑页面,输入如下示例代码:
-- 唯一标识:id + gmt_create + gmt_modified + data_status

-- 拉链存储的实现逻辑:业务日期当日 ods_order_di 新增或更新的所有数据插入 dwd_order,且通过 end_date 标识为有效数据
-- dwd_order 中有更新过的有效数据修改成失效数据。

insert overwrite table dwd_order
select  
    id
    ,gmt_create
    ,gmt_modified
    ,data_status
    ,start_date
    ,end_date
from (
    select  
        id
        ,gmt_create
        ,gmt_modified
        ,data_status
        ,start_date
        ,end_date
        -- 支持重跑:使用开窗函数,按 id, gmt_create, gmt_modified, data_status 分组,按 end_date 降序排序,仅取第一条数据。
        ,row_number() over (distribute by id,gmt_create,gmt_modified,data_status sort by end_date desc ) as row_num
    from (
        -- 存量历史数据与业务日期当日新增数据,通过主键关联
        select  
            a.id                as id
            ,a.gmt_create       as gmt_create
            ,a.gmt_modified     as gmt_modified
            ,a.data_status      as data_status
            ,a.start_date       as start_date
            -- 如果当日新增数据不为空且存量历史数据的 end_date 等于固定日期 '9999-12-31',则将存量历史数据的 end_date 置为业务日期
            -- 即:将存量历史数据置为失效数据
            ,case   
                when b.id is not null and cast(a.end_date as string) = '9999-12-31' 
                    then 
                        date'${biz_date}' 
                else 
                    a.end_date
            end                 as end_date
        from dwd_order as a left outer join (
            -- 当日新增数据
            select  
                id
                ,gmt_create
                ,gmt_modified
                ,data_status
            from ods_order_di t
            where pt = '${biz_date}'
                -- 若存量历史表中已存在相同的数据,则不重新纳入计算(防止数据重算而导致数据有效时间异常)
                and not exists (
                    select 1 from dwd_order p 
                    where t.id = p.id 
                        and t.gmt_create = p.gmt_create
                        and t.gmt_modified = p.gmt_modified 
                        and t.data_status = p.data_status
                )
        ) b
        on a.id = b.id
        union all 
        -- 当日新增或更新的有效数据:业务日期当日 ods_order_di 新增或更新的所有数据插入 dwd_order,且通过 end_date 标识为有效数据。
        select  
            id
            ,gmt_create
            ,gmt_modified
            ,data_status
            ,date'${biz_date}'  as start_date
            ,date'9999-12-31'   as end_date
        from ods_order_di t
        where pt = '${biz_date}'
            -- 若存量历史表中已存在相同的数据,则不重新纳入计算(防止数据重算而导致数据有效时间异常)
            and not exists (
                select 1 from dwd_order p 
                where t.id = p.id 
                    and t.gmt_create = p.gmt_create
                    and t.gmt_modified = p.gmt_modified 
                    and t.data_status = p.data_status
            )       
    ) 
)
where row_num = 1;
  • 调度参数配置,使用手动输入的方式添加调度参数 biz_date=$[yyyy-mm-dd-1]

  • 配置跨周期依赖。由于 dwd_order 表的当天数据处理依赖于自身前一日的数据产出,为了确保数据处理的正确性和连续性。通过配置自依赖,可以保障只有在上一周期的数据成功处理后,才会进行当前周期的数据加载,从而保障了数据处理流程的顺利进行。

拉链表数据加载逻辑

  1. 业务日期当日 dwd_order 表中,有被更新过的有效数据 “修改” 成失效数据;

  2. 业务日期当日 ods_order_di 表中,新增的所有数据插入 dwd_order 表中,状态标识为有效数据。

以业务日期 10.5 日拉链表数据加载为例,具体步骤如下:

  • 原始数据:在 10.4 日的拉链表记录中,存在一条订单ID为 “210001”,其状态为 “创建”。
  • 状态变更:到 10.5 日该订单状态被更新为 “支付”。
  • 数据对比逻辑:
    a. 将 10.4 日拉链表历史全量有效数据和增量表 “2023-10-05” 分区新增的数据关联查询。
    b. 更新状态判断:如果订单ID为 “210001” 的订单在 10.4 日拉链表中存在,且 end_data 为 “9999-12-31”,说明该记录有效,同时该订单出现在增量表 “2023-10-05” 的分区中,则说明状态已更新。
  • 更新处理:
    a. 需要将原拉链表中记录 “end_date=9999-12-31” 修改为当前业务日期 “end_date=2023-10-05”。
    b. 如果该订单ID未出现在增量表“2023-10-05”的分区中,则说明无更新,维持原记录不变。
  • 数据覆盖:
    完成上述步骤后,将拉链表中处理好的历史数据与增量表中新增的所有数据,一同覆盖写回拉链表中,最终产出 10.5 日拉链表数据。

拉链表的使用

  • 查询 dwd_order 表中全量订单的所有历史快照。

http://pic.opcoder.cn/MaxCompute-Zipper-Table-03.png

  • 查询 dwd_order 表中全量最新状态的有效数据,根据 end_date='9999-12-31' 查询

http://pic.opcoder.cn/MaxCompute-Zipper-Table-04.png

重跑数据验证

  • 如上图,id='210001 and gmt_modified='2023-10-06 这条记录为最新数据,现重跑 pt='2023-10-04' 分区的数据,并检查历史数据状态。

http://pic.opcoder.cn/MaxCompute-Zipper-Table-03.png

由于已存在最新的数据 start_date='2023-10-06,且存量历史数据已存在相同的记录 start_date='2023-10-04 所以重跑历史数据,不会对数据产生影响。

债券评级拉链表

案例介绍

数据表设计,本案例共设计2张数据表。

  • 债券评级源表:ods_bondcredit_d,用于存放从业务库同步而来的每日全量数据。包含如下几个字段:
字段 描述
bondid 业务主键债券ID
bondcode 债券组合代码
declaredate 披露日期
changedate 变动日期
creditrank 信用等级
  • 债券评级事实表(拉链表):dwd_bondcredit,用于拉链存储全量有效和失效的数据。包含如下几个字段:
字段 描述
bondid 业务主键债券ID
bondcode 债券组合代码
declare_date 披露日期
start_date 有效开始日期
end_date 有效结束时间
creditrank 信用等级
status 数据状态(0: 无效;1:有效)

拉链表实现逻辑。

  • 本案例将使用拉链表记录债券评级从开始到当前状态的所有变动信息。

任务开发:增量表数据准备

  • 表数据:债券评级源表,包含业务主键债券 ID 和债券评级 creditrank 等 字段,用于存放全量债券评级数据。

  • 将数据写入增量表

create table if not exists ods_bondcredit_d (
    bondid          bigint  comment '债券ID'
    ,bondcode       string  comment '债券组合代码'
    ,declaredate    date    comment '披露日期'
    ,changedate     date    comment '变动日期'
    ,creditrank     string  comment '信用等级'
)
comment '债券评级源表'
partitioned by (
    ds            string comment '日期,格式为yyyy-mm-dd'
)
lifecycle 7;


insert overwrite table ods_bondcredit_d partition (ds = '2023-10-01') 
select  
    cast(bond_cgchg_id  as bigint)  as  bondid 
    ,bondcode
    ,cast(declaredate   as date)    as declaredate
    ,cast(changedate    as date)    as changedate
    ,creditrank
from (
    select 
        t.*
        ,row_number() over(partition by bondcode order by changedate) rn
    from (
        select * from v_cjht_finchina_bond_cgchg 
        where ds = '99991231' 
            and bondcode in (
                '20180096001FCW', '21030300002GRT', '21030600001SKZ', '18030200002ATZ'
            )
            and cevaluitcode in (
                '80002192', '80117870', '80002172', '80129270', 
                '80062388', '80002154', '80000858', '80002099'
            )
    ) t
) 
where rn = 1;

insert overwrite table ods_bondcredit_d partition (ds = '2023-10-02') 
select  
    cast(bond_cgchg_id  as bigint)  as  bondid 
    ,bondcode
    ,cast(declaredate   as date)    as declaredate
    ,cast(changedate    as date)    as changedate
    ,creditrank
from (
    select 
        t.*
        ,row_number() over(partition by bondcode order by changedate) rn
    from (
        select * from v_cjht_finchina_bond_cgchg 
        where ds = '99991231' 
            and bondcode in (
                '20180096001FCW', '21030300002GRT', '21030600001SKZ', '18030200002ATZ'
            )
            and cevaluitcode in (
                '80002192', '80117870', '80002172', '80129270', 
                '80062388', '80002154', '80000858', '80002099'
            )
    ) t
) 
where rn = 2;

insert overwrite table ods_bondcredit_d partition (ds = '2023-10-03') 
select  
    cast(bond_cgchg_id  as bigint)  as  bondid 
    ,bondcode
    ,cast(declaredate   as date)    as declaredate
    ,cast(changedate    as date)    as changedate
    ,creditrank
from (
    select 
        t.*
        ,row_number() over(partition by bondcode order by changedate) rn
    from (
        select * from v_cjht_finchina_bond_cgchg 
        where ds = '99991231' 
            and bondcode in (
                '20180096001FCW', '21030300002GRT', '21030600001SKZ', '18030200002ATZ'
            )
            and cevaluitcode in (
                '80002192', '80117870', '80002172', '80129270', 
                '80062388', '80002154', '80000858', '80002099'
            )
    ) t
) 
where rn = 3;

insert overwrite table ods_bondcredit_d partition (ds = '2023-10-04') 
select  
    cast(bond_cgchg_id  as bigint)  as  bondid 
    ,bondcode
    ,cast(declaredate   as date)    as declaredate
    ,cast(changedate    as date)    as changedate
    ,creditrank
from (
    select 
        t.*
        ,row_number() over(partition by bondcode order by changedate) rn
    from (
        select * from v_cjht_finchina_bond_cgchg 
        where ds = '99991231' 
            and bondcode in (
                '20180096001FCW', '21030300002GRT', '21030600001SKZ', '18030200002ATZ'
            )
            and cevaluitcode in (
                '80002192', '80117870', '80002172', '80129270', 
                '80062388', '80002154', '80000858', '80002099'
            )
    ) t
) 
where rn = 4;

insert overwrite table ods_bondcredit_d partition (ds = '2023-10-05') 
select  
    cast(bond_cgchg_id  as bigint)  as  bondid 
    ,bondcode
    ,cast(declaredate   as date)    as declaredate
    ,cast(changedate    as date)    as changedate
    ,creditrank
from (
    select 
        t.*
        ,row_number() over(partition by bondcode order by changedate) rn
    from (
        select * from v_cjht_finchina_bond_cgchg 
        where ds = '99991231' 
            and bondcode in (
                '20180096001FCW', '21030300002GRT', '21030600001SKZ', '18030200002ATZ'
            )
            and cevaluitcode in (
                '80002192', '80117870', '80002172', '80129270', 
                '80062388', '80002154', '80000858', '80002099'
            )
    ) t
) 
where rn = 5;

http://pic.opcoder.cn/MaxCompute-Zipper-Table-06.png

数据开发:拉链表的实现

创建 ODPS SQL 节点:dwd_bondcredit,用于产出债券评级事实明细表数据。

  • 表数据:债券评级事实明细表(拉链表),用于存放全量有效数据和全量失效数据。包含有效开始日期 start_date、有效结束时间 end_date 等字段。

  • dwd_bondcredit 临时查询页面,输入如下示例代码:

create table if not exists dwd_bondcredit (
    bondid          bigint  comment '债券ID'
    ,bondcode       string  comment '债券组合代码'
    ,declare_date   date    comment '披露日期'
    ,start_date     date    comment '生效日期,格式为yyyy-mm-dd'
    ,end_date       date    comment '失效日期,格式为yyyy-mm-dd'
    ,creditrank     string  comment '信用等级'
    ,status         string  comment '状态'
)
comment '债券评级事实明细表(拉链表)';

dwd_bondcredit 节点编辑页面,输入如下示例代码:

唯一标识:bondid

insert overwrite table dwd_bondcredit
select  
    bondid
    ,bondcode
    ,declare_date
    ,start_date
    ,end_date
    ,creditrank
    ,status
from (
    select  
        a.bondid            as bondid
        ,a.bondcode         as bondcode
        ,a.declare_date     as declare_date
        ,a.start_date       as start_date
        ,case   
            when b.bondid is not null and a.status = '1' 
                then 
                    b.changedate
            else 
                a.end_date
        end                 as end_date
        ,a.creditrank       as creditrank
        ,case 
            when b.bondid is not null and a.status = '1'
                then 
                    '0'
            else 
                a.status 
        end                 as status 
    from dwd_bondcredit a left outer join (
        -- 当日新增数据
        select  
            bondid
            ,bondcode
            ,declaredate
            ,changedate
            ,creditrank
        from ods_bondcredit_d t
        where ds = '${biz_date}'
            and not exists (
                select 1 from dwd_bondcredit p 
                where t.bondid = p.bondid
            )
    ) b
        on a.bondid = b.bondid
    union all 
    select  
        bondid
        ,bondcode
        ,declaredate
        ,changedate         as start_date
        ,date'9999-12-31'   as end_date
        ,creditrank         as creditrank
        ,'1'                as status  
    from ods_bondcredit_d t
    where ds = '${biz_date}'
        and not exists (
            select 1 from dwd_bondcredit p 
            where t.bondid = p.bondid
        )       
);

结果查询

http://pic.opcoder.cn/MaxCompute-Zipper-Table-07.png

将存量历史表变更为时间拉链表

保留历史变动信息

存在如下数据,同一个 bondcode,存在多个 changedate 字段值。现通过 changedate 字段衍生出 begin_dt 和 end_dt 时间拉链字段,并保留所有 changedate 历史数据。

http://pic.opcoder.cn/MaxCompute-Zipper-Table-09.png

with 
-- 删除表中的重复数据
a as (
    select * from (
        select 
            t.* 
            ,row_number() over(partition by bondcode, cevaluitcode, changedate order by changedate) rna 
        from v_cjht_finchina_bond_cgchg t 
        where t.bondcode in (
            '21030600001SKZ', '18030200002ATZ', '19140100001TYY'
        ) 
    )
    where rna = 1
),
--按照 bondcode 和 cevaluitcode 字段分组并按照 changedate 排序
b as (
    select 
        t.* 
        ,row_number() over(partition by bondcode, cevaluitcode order by changedate) rnb
    from a t 
)
-- 向下自关联
select 
    t1.bondcode
    ,cast(t1.declaredate    as date)    as declaredate
    ,cast(t1.changedate     as date)    as start_dt 
    ,case 
        when cast(t2.changedate as date) is null 
            then 
                date'9999-12-31'
        else 
            cast(t2.changedate as date) - 1
    end                                 as end_dt 
    ,t1.cevaluitcode
    ,t1.cevaluit    
    ,t1.creditrank
    ,case 
        when cast(t2.changedate as date) is null 
            then 
                '1'
        else 
            '0'
    end                                 as status 
from b t1 left outer join b t2
    on  t1.bondcode = t2.bondcode 
    and t1.cevaluitcode = t2.cevaluitcode 
    and t1.rnb + 1 = t2.rnb
order by t1.bondcode, t1.cevaluitcode, t1.changedate;

http://pic.opcoder.cn/MaxCompute-Zipper-Table-10.png

不保留历史变动信息

从上图中可知,bondcode='19140100001TYY' and cevaluitcode='80002172' 存在多笔相同的记录(changedate 不一样但是 creditrank 一样)。可进行合并。

基于排序后的结果集向下向上自关联,并重新编号:

where (
    -- 上下都不同
    (t1.creditrank != coalesce(t2.creditrank, '') and t1.creditrank != coalesce(t3.creditrank, '')) or 
    -- 与下相同与上不同    
    (t1.creditrank = coalesce(t2.creditrank, '') and t1.creditrank != coalesce(t3.creditrank, '')) or 
    -- 保留尾条
    t2.creditrank is null
)  

根据如下条件筛选数据:

  • t1 = t2 and t1 = t3:上下都相同,去除
  • t1 != t2 and t1 != t3:上下都不同,保留
  • t1 = t2 and t1 != t3:与下相同,与上不同,保留
  • t1 != t2 and t1 = t3:与下不同,与上相同,去除
  • 另外遇到倒数2条记录都相同的情况,尾条必须保留

http://pic.opcoder.cn/MaxCompute-Zipper-Table-11.png

向下自关联,并更新有效日期-完整代码

with 
-- 删除表中的重复数据
a as (
    select * from (
        select 
            t.* 
            ,row_number() over(partition by bondcode, cevaluitcode, changedate order by changedate) rna 
        from v_cjht_finchina_bond_cgchg t 
        where t.bondcode in (
            '21030600001SKZ', '18030200002ATZ', '19140100001TYY'
        ) 
    )
    where rna = 1
),
--按照 bondcode 和 cevaluitcode 字段分组并按照 changedate 排序
b as (
    select 
        t.bondcode
        ,cast(t.changedate  as date)    as changedate 
        ,t.cevaluitcode
        ,t.cevaluit
        ,t.creditrank
        ,row_number() over(partition by bondcode, cevaluitcode order by changedate) rnb
    from a t 
)
-- 向下向上自关联,并重新编号
,c as (
    select 
        t1.bondcode
        ,t1.changedate 
        ,t1.cevaluitcode
        ,t1.cevaluit    
        ,t1.creditrank
        ,row_number() over(partition by t1.bondcode, t1.cevaluitcode order by t1.changedate) rnc
    from b t1 left outer join b t2
        on  t1.bondcode = t2.bondcode 
        and t1.cevaluitcode = t2.cevaluitcode 
        and t1.rnb + 1 = t2.rnb
    left outer join b t3
        on  t1.bondcode = t3.bondcode 
        and t1.cevaluitcode = t3.cevaluitcode 
        and t1.rnb - 1 = t3.rnb  
    where (
        -- 上下都不同
        (t1.creditrank != coalesce(t2.creditrank, '') and t1.creditrank != coalesce(t3.creditrank, '')) or 
        -- 与下相同与上不同    
        (t1.creditrank = coalesce(t2.creditrank, '') and t1.creditrank != coalesce(t3.creditrank, '')) or 
        -- 保留尾条
        t2.creditrank is null
    )  
)
-- 向下自关联
select 
    t1.bondcode
    ,t1.changedate      as begin_dt 
    ,case 
        when t2.changedate is null 
            then 
                date'9999-12-31'
        else 
            t2.changedate  - 1
    end                 as end_dt 
    ,t1.cevaluitcode
    ,t1.cevaluit    
    ,t1.creditrank 
    ,case 
        when t2.changedate is null 
            then 
                '1'
        else 
            '0'
    end                 as end_dt         
from c t1 left outer join c t2
    on  t1.bondcode = t2.bondcode 
    and t1.cevaluitcode = t2.cevaluitcode 
    and t1.rnc + 1 = t2.rnc    
order by t1.bondcode, t1.cevaluitcode, t1.changedate;

http://pic.opcoder.cn/MaxCompute-Zipper-Table-13.png

参考资料

MaxCompute 任务最佳实践 > 基于 MaxCompute 实现拉链表

快照表转换成拉链表的方式

ETL 项目中变更表如何转换为拉链表

SQL 切片表转拉链表

原创文章,转载请注明出处:http://www.opcoder.cn/article/65/