Python UDTF开发
炎凰数据平台提供了Python工具Eggfarm来辅助开发Python自定义表函数,我们先来介绍Python表函数相关API。
开发环境
- Python >=3.9,<4.0
- Poetry >=1.0 安装文档
Python UDTF APIs
表函数API
@udtf(is_parser=False)
class BaseFunction():
def get_name(self):
"""
:return: the name of the table function
"""
return "base"
def initialize(self, table_writer):
"""
This method will be called once for every batch
in the input table with function applied
:param table_writer: table writer for writing produced results
:return: None
"""
pass
def process(self, params, table_writer, context):
"""
This method is main body of function using input parameters.
When applying table function,
it will be called once for every row from input table.
:param params: a list containing all of the input parameters
:param context: context that maintaining more information,
which is reserved for future
:param table_writer: write the produced rows and columns
into the result table.
"table_writer" has three writing mode:
- row oriented
- column oriented
- batch oriented
in a function process, single writing mode is required.
related apis are:
- write_row(kv_pairs) (all values are saved as string type)
- write_column(column_name, column_type, [column_values])
- table_batch_iterator
:return: None
"""
示例代码中展示了表函数的所有API:
get_name
:返回表函数的名称initialize
:初始化table_writer
或者初始化表函数执行过程中的状态信息process
:表函数的主体逻辑params
: 表函数输入参数列表table_writer
: 存放处理结果context
: 预留参数
平台处理返回数据的接口是通过table_writer
来表达的,目前包含三种写方式:
write_row(dict)
按行写入结果,这里输入的参数只有一个,由键值对组成的字典结果,需要注意的是,这里会将所有的值转换为string
类型返回write_column(column_name, column_type, [column_values])
: 按列写入结果column_name
:列名;column_type
:列具体类型,这里需要使用pyarrow的数据类型,在同一个process处理过程中,请不要将相同的列名赋予不同的数据类型。常用的数据类型:pyarrow.utf8()
,pyarrow.int64()
等;column_values
:需要写入的列值列表,值类型需要和列类型匹配;
table_batch_iterator
:将结果组织成生成器Generator传入table_writer
中,生成器Generator调用next
方法后生成的结果为pyarrow batch
数据类型。
另外,我们可以添加装饰器udtf
来显示定义表函数类型,用以得到更好的性能,但这并不会对表函数处理过程进行检查。
def udtf(is_lookup=False, is_parser=False, not_producing_multi_rows=False):
"""
@param is_parser: `is_parser=True` indicates this table function parses its argument and produces key value pairs as a new row. It should NOT write more than 1 row for each input if specified. Specifying it to be True helps performance.
@param is_lookup: `is_lookup=True` indicates this table function uses its argument as lookup key and produces key value pairs as a new row. It should NOT write more than 1 row for each input if specified. Specifying it to be True helps performance.
@param not_producing_multi_rows: `not_producing_multi_rows=True` indicates this table function produces no more than one row. It should NOT write more than 1 row for each input if specified.
"""
def decorate_with_udtf(func_class):
func_class.is_parser = is_parser
func_class.is_lookup = is_lookup
func_class.not_producing_multi_rows = is_lookup or is_parser or not_producing_multi_rows
return func_class
return decorate_with_udtf
装饰器udtf
提供了三个参数来表述表函数的类型:
is_lookup
is_parser
not_producing_multi_rows
这三个参数意在指定表函数对于任意输入参数列表process处理后,只生成不多于一行的结果。当表函数被指定这样的装饰器后,在Apply
操作中,平台将节省Apply
原始表的处理过程,可以假定原始表原样返回,从而提升性能。
Eggfarm
本章节介绍用于辅助开发Python表函数的工具Eggfarm。
基本使用
- 安装
pip install eggfarm
- eggfarm已经发布到公共环境
- 安装eggfarm后会同时安装
eggfarm
命令
- 使用
eggfarm new add_two
- 这里
add_two
为表函数的名称,需要注意的是,这个名称需要和后续注册到平台中的名称保持一致。
生成表函数的结构展示如下:
add_two/
├── add_two
│ ├── __init__.py
│ ├── info.toml
│ └── version.py
├── pyproject.toml
├── tests
│ ├── func_test.py
│ └── supported_signature_list.py
├── Makefile
└── README.md
进入add_two
目录下执行make install && make test && make package
就可以得到最简单的add_two
函数打包结果,可直接上传到平台文件管理页面,然后注册使用。
那么这些文件具体代表什么呢?
pyproject.toml
Eggfarms生成的目录是基于poetry开发环境的,带有pyproject.toml
文件:
[tool.poetry]
name = "add_two"
version = "0.1.0"
description = "add_two table function"
authors = []
[tool.poetry.dependencies]
python = "^3.9"
[tool.poetry.dev-dependencies]
pytest = "^6.0.1"
# depends on the udtfs package
stonewave-sql-udtfs = "^0.6.0"
toml = "^0.10.2"
[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"
pyproject.toml
需要用户加入表函数所有用到的依赖,这里已经默认将stonewave-sql-udtfs
和toml
依赖加入到tool.poetry.dev-dependencies
中,stonewave-sql-udtfs
包含了所有表函数基本API,toml
用于读取表函数签名。放在tool.poetry.dev-dependencies
中的依赖库将不会打包到最终的结果中,可以放测试相关的应用库。对于函数实际执行中需要用到的依赖请放到tool.poetry.dependencies
中,最后需要被打包到结果中。
__init__.py
Eggfarm生成了基于创建时的函数名称的表函数接口,继承自BaseFunction
:
from stonewave.sql.udtfs.base_function import BaseFunction, udtf
from stonewave.sql.udtfs.logger import logger
import pyarrow
# TODO: change `is_parser` to True if your table function parses an event and extracts fields from the event.
# Each event parsing SHOULD produce exactly one event.
# by specifying `is_parser` property, the performance will be better
@udtf(is_parser=False)
class AddTwoFunction(BaseFunction):
def get_name(self):
"""
:return: the name of the table function
"""
return "add_two"
# ...
def process(self, params, table_writer, context):
"""
# method 1:
# using kv pairs to append row, kv pairs means column name and column value
# all results are appended in string datatype
num1 = params[0]
num2 = params[1]
table_writer.write_row({"add_result": num1+num2 })
# ===> | add_result |
# | num1+num2 |
"""
# method 2 (Advanced):
# using write_column to add a column using pyarrow datatype
# using extend to append multiple value for column
num1 = params[0]
num2 = params[1]
table_writer.write_column("add_result", pyarrow.int64(), [int(num1) + int(num2)])
"""
这里process
生成了简单代码,示例table_writer
的使用。所有日志请使用from stonewave.sql.udtfs.logger import logger
作为统一的日志收集接口。
info.toml
# TODO: change the function sigature according to your needs
# to support multiple signatures, you can add more signatures to this list
# This file is for stonewave table function registration.
# it is not supported to have multiple signatures with the same number of parameters
# e.g. signature_list = [ [ "INT", "STRING", "INT",], [ "INT", "STRING", "INT", "BOOL",], [ "INT", "STRING", "INT", "BOOL", "FLOAT",],]
# current valid data types are: [STRING | INT | BOOL | FLOAT | TABLE]
signature_list = [ ["INT", "INT"] ]
注册函数包到炎凰数据平台时,需要指定函数的参数列表,info.toml
便承担了这部分工作。很多时候,函数参数列表含有参数默认值,SQL
在表达参数默认值时不直接,这里info.toml
提供参数列表的列表signature_list
,来表达含有参数默认值的情况。signature_list
需要列出全部可能的参数列表。当函数没有默认值时,signature_list
即为单一列表元素的列表。
tests
tests
目录包含了相应的简单的测试。
supported_signature_list.py
文件无需更改,主要用于测试中对参数的检查func_test.py
包含简单的测试
基于上述介绍,开发一个简单的表函数,只需要更改process
方法,以及填写函数签名signature_list
即可完成。但值得注意的是,打包上传的依赖需要是linux可安装的包,所以make package
将下载对应依赖的linux包,这可能和你的开发环境依赖的包不一致。
针对于arm64环境的python udtf开发,在最新版的eggfarm 中支持使用make package_arm64
下载arm64开发依赖包。
注册
- 首先我们需要将Eggfarm打包的函数包上传至文件管理
- 应当上传格式为
<function_name>_table_func_<creation_datetime>.tar.gz
的文件 - 上传文件时须
关闭解压
- 在查询页面创建删除自定义Python表函数,语法支持如下:
CREATE [OR REPLACE] FUNCTION <function_name>
LANGUAGE PYTHON
PACKAGE '<package_path>'
DROP FUNCTION <function_name>
LANGUAGE PYTHON package_path
是上传到文件管理的相对文件路径,当前即文件名- ⚠️ 注册python table function时会重启
python table function server
,在其他的python table function正在执行时会注册失败,需要等待其他的python table function执行完成后才能成功执行注册。
表函数process示例
抽取函数(write_row)
from pygrok import Grok
@udtf(is_parser=True)
class ParseGrokFunction(BaseFunction):
def get_name(self):
return "parse_grok"
def process(self, params, table_writer, context):
text = params[0]
grok_pattern = params[1]
grok = Grok(grok_pattern)
extracted_values = grok.match(text)
if extracted_values:
table_writer.write_row(extracted_values)
使用示例
SELECT ip FROM parse_grok('ip is 127.0.0.1', '%{IPV4:ip}')
ip |
---|
127.0.0.1 |
数据生成函数(write_column)
from faker import Faker
import pyarrow
from stonewave.sql.udtfs.logger import logger
fake = Faker()
# add faker provider
class FakerFunction(BaseFunction):
def get_name(self):
return "faker"
def _safe_cast(self, value, to_type, default=None):
try:
return to_type(value)
except (ValueError, TypeError):
return default
def process(self, params, table_writer, context):
row_count = max(0, self._safe_cast(params[0], int, 0))
if params[1] is None:
logger.debug("faker column name is not provided, no row is produced")
return
column_names = params[1].split(",")
for column in columns:
field_faker = getattr(fake, column, None)
field_type = _get_data_type(field_faker)
array = [field_faker() if field_faker else None for i in range(row_count)]
table_writer.write_column(column, field_type, array)
logger.debug(
"rows generated via faker",
row_count=row_count,
)
# we need use pyarrow datatype
def _get_data_type(field_faker):
if not field_faker:
return pyarrow.utf8()
else:
value = field_faker()
# bool needs to be placed before int because isinstance(True, int) == True
if isinstance(value, bool):
return pyarrow.bool_()
elif isinstance(value, int):
return pyarrow.int64()
elif isinstance(value, float):
return pyarrow.float64()
else:
return pyarrow.utf8()
使用示例
select * from faker(10, 'name, email')
name | |
---|---|
Steven Salazar | ldavis@gmail.com |
Lydia Sanchez | john86@hotmail.com |
Brian Tucker | opatterson@hatfield-kelly.com |
Donna Williams | nolansamuel@gilmore-sandoval.com |
Christopher Solomon | ashleyjordan@green.com |
Jonathan Anderson | sarah09@yahoo.com |
Tina Berry | gregoryharris@brown.info |
Jacob Lopez | harrismichael@yahoo.com |
Amanda Richardson | nmarshall@sanchez-lee.com |
读取文件函数(table_batch_iterator)
import pyarrow as pa
import pandas as pd
from stonewave.sql.udtfs.logger import logger
from stonewave.sql.udtfs.base_function import BaseFunction
from stonewave.sql.udtfs.constants import STONEWAVE_HOME
class LoadExcelFunction(BaseFunction):
def __init__(self):
# uploaded files will be placed in
# ${STONEWAVE_HOME}/var/external_data
self.external_data_dir =
os.path.join(STONEWAVE_HOME, "var", "external_data")
def get_name(self):
return "load_excel"
def process(self, params, table_writer, context):
excel_file = params[0]
logger.debug(
"executing load_excel table function",
excel_file=excel_file,
)
try:
self.set_path(excel_file)
excel_file_path = self.get_path()
excel_data = pd.read_excel(
excel_file_path,
engine="openpyxl",
dtype=str,
)
sheets = []
for sheet_name, df in excel_data.items():
# trim spaces in column names
df.columns = df.columns.str.strip()
table = pa.Table.from_pandas(df, preserve_index=False)
batches = table.to_batches()
if batches:
sheets.append(batches[0])
table_writer.table_batch_iterator = iter(sheets)
except Exception as e:
logger.error("failed to load excel file", error=str(e))
def set_path(self, path):
logger.debug(
"executing set_path in base dataloader",
path=path,
external_data_dir=self.external_data_dir,
)
# ... check path
self.path = data_path
def get_path(self):
return self.path
使用示例
select * from load_excel('档案编号.xlsx')
户管档案编号 |
---|
201102 |
201104 |
201103 |
201105 |
201201 |
201202 |
201203 |
201204 |
201205 |
表值函数
import pyarrow as pa
class SummarizeFunction(BaseFunction):
def __init__(self):
self.tables = []
def get_name(self):
return "summarize"
def process(self, params, table_writer, context):
assert len(params) > 0
# table valued parameter must be placed in first place
# python process will process a streaming of table batches
# summarize function needs to combine all batches to calculate results
batch = params[0]
if batch is not None:
table = pa.Table.from_batches([batch])
self.tables.append(table)
# when first parameter is None, means batch streaming is ended
else:
# when get all batch of input table, calculate summarize stats
self.summarize(table_writer)
def summarize(self, table_writer):
if self.tables:
table = pa.concat_tables(self.tables, promote=True)
df = table.to_pandas()
desc_df = df.describe(datetime_is_numeric=True)
# the df is transposed because it contains mixed type column, which is not allowed in arrow
desc_df = desc_df.transpose()
desc_df.insert(0, "fields", desc_df.index)
desc_table = pa.Table.from_pandas(desc_df, preserve_index=False)
batches = desc_table.to_batches()
table_writer.table_batch_iterator = iter(batches)
else:
return
使用示例
with cte as (select * from generate_series(1, 10))
select * from summarize(cte)
fields | count | mean | std | min | 25% | 50% | 75% | max |
---|---|---|---|---|---|---|---|---|
generate_series | 10 | 5.5 | 3.0276503540974917 | 1 | 3.25 | 5.5 | 7.75 | 10 |
常见问题
1. make package失败
打包上传的文件需要下载所有直接依赖库,可以是的平台不需要联网便可安装新的表函数。然而平台运行在linux环境下,所以make package
会下载所有依赖的linux平台包,需要确认开发时环境所有的依赖包及对应的版本是否在linux平台也存在。
2. 上传注册成功后,执行出错
在函数测试正确的情况下,执行出错,那么需要重启平台后台查询服务(Search Broker Pod
),以便加载新的函数包。