1970 年代的许多计算概念已经过时,但ETL (Extract-Transform-Load)及其最近的 anagram shuffle ELT并非如此,它在目的地与飞行中操纵数据。ETL 和 ELT 传统上是计划的批处理操作,但随着对始终在线、始终最新的数据服务的需求成为常态,在数据流上操作的实时 ELT 是许多组织的目标——如果不是现实的话。
在实际使用中,ETL 中的“T”代表由原始操作组装而成的各种模式。在本文中,我们将探索这些操作并查看如何将它们实现为 SQL 语句的示例。
是的!SQL 将声明性语言的强大和简洁性与任何使用代码或数据的人的普遍技能相结合。与您可能用作替代的几乎任何编程语言不同,SQL 的普及要归功于将近 50 年的寿命——计算行业中的几乎每个人都曾在某个时候使用过它。SQL 的强大功能和普遍性意味着它无处不在,甚至在构建最新开发人员技术和服务的公司中也是如此。当通过函数增强时,SQL 变得更加强大。
过滤器从流中删除不需要的记录,删除与 SQL where子句中的“规则”不匹配的记录。过滤器通常用于抑制敏感记录以确保合规性,或减少目标系统上的处理负载或存储需求。
Filter only records pertaining to the application insert into application_events select * from http_eventswhere hostname = 'app.decodable.co' Filter only records that modify the inventory insert into inventory_updates select * from http_eventswhere hostname = 'api.mycompany.com' and path like '/v1/inventory%' and method in ( 'POST', 'PUT', 'DELETE', 'PATCH' )
Route 模式从一个或多个输入流创建多个输出流,根据一组规则将记录定向到正确的目的地。此模式实际上由多个过滤器组成,它们都可以查看每个输入记录,但每个过滤器仅传输与该特定目的地的规则匹配的那些记录。
Route security-related HTTP events insert into security_events select * from http_eventswhere path like '/login%' or path like '/billing/cc%' Route app-related HTTP events insert into application_events select * from http_eventswhere hostname = 'app.decodable.co' Route requests to Customer Success if it looks like the user needs help insert into cs_alerts select * from http_events where response_code between 500 and 599 or -- any server failure ( path = '/signup' and response_code != 200 ) or -- failed to sign up for any reason
转换管道通过修改输入记录来创建输出记录。通常这将导致 1:1 传输,但在某些情况下,输出来自多个输入记录,因此可能存在 1:many 关系。在这里,我们将调用三个专门的转换:
Parse timestamp and action insert into user_events select to_date(fields['ts'], 'YYYY-MM-DD''T''HH:MI:SS') as ts, fields['user_id'] as user_id, fields['path'] as path, case fields['method'] when 'GET' then 'read' when 'POST', 'PUT' then 'modify' when 'DELETE' then 'delete' end as actionfrom ( select grok( body, '\[${ISO8661_DATETIME:ts} ${DATA:method} "${PATH:path}" uid:${DATA:user_id}' as fields from http_event
Cleanse incoming data for downstream processes insert into sensor_readings select cast(ifnull(sensor_id, '0') as bigint) as sensor_id, lower(trim(name)) as name, cast(`value` as bigint) as reading from raw_sensor_readings
Anonymize SSNs and zip codes insert into user_events_masked select user_id, username, overlay(ssn placing '*' from 1 for 12) as ssn, substring(zip_code from 1 for 2) as zip_code_1, action from user_events
图片聚合管道通常使用 SQL 窗口函数将传入记录分组到存储桶中(通常基于时间),在这些存储桶上执行聚合操作。Count、Min、Max、Avg、Sum 是典型的运算符,但还有很多。
Count the number of events by path and status every 10 seconds. insert into site_activity select window_start, window_end, path, status, count(1) as `count` from table( tumble( table http_events, descriptor(_time), interval '10' seconds ) )group by window_start, window_end, path, status
Build hourly usage data for a Stripe integration on the output stream insert into stripe_product_usage select window_start as _time, customer_id, 'abcd1234' as price_id sum(bytes_sent) / 1024 / 1024 as mb_sentfrom table( tumble( table document_downloads, descriptor(_time), interval '1' hour ) )group by window_start, customer_idhaving mb_sent > 1024