Skip to main content
Version: 2.15.0

Python UDTF开发

炎凰数据平台提供了Python工具Eggfarm来辅助开发Python自定义表函数,我们先来介绍Python表函数相关API。

开发环境

Python UDTF APIs

表函数API

@udtf(is_parser=False)
class BaseFunction():
def get_name(self):
"""
:return: the name of the table function
"""
return "base"

def initialize(self, table_writer):
"""
This method will be called once for every batch
in the input table with function applied
:param table_writer: table writer for writing produced results
:return: None
"""
pass

def process(self, params, table_writer, context):
"""
This method is main body of function using input parameters.
When applying table function,
it will be called once for every row from input table.
:param params: a list containing all of the input parameters
:param context: context that maintaining more information,
which is reserved for future
:param table_writer: write the produced rows and columns
into the result table.
"table_writer" has three writing mode:
- row oriented
- column oriented
- batch oriented
in a function process, single writing mode is required.
related apis are:
- write_row(kv_pairs) (all values are saved as string type)
- write_column(column_name, column_type, [column_values])
- table_batch_iterator
:return: None
"""

示例代码中展示了表函数的所有API:

  • get_name:返回表函数的名称
  • initialize:初始化table_writer或者初始化表函数执行过程中的状态信息
  • process:表函数的主体逻辑
    • params: 表函数输入参数列表
    • table_writer: 存放处理结果
    • context: 预留参数

平台处理返回数据的接口是通过table_writer来表达的,目前包含三种写方式:

  • write_row(dict)按行写入结果,这里输入的参数只有一个,由键值对组成的字典结果,需要注意的是,这里会将所有的值转换为string类型返回
  • write_column(column_name, column_type, [column_values]): 按列写入结果
    • column_name:列名;
    • column_type:列具体类型,这里需要使用pyarrow的数据类型,在同一个process处理过程中,请不要将相同的列名赋予不同的数据类型。常用的数据类型:pyarrow.utf8()pyarrow.int64()等;
    • column_values:需要写入的列值列表,值类型需要和列类型匹配;
  • table_batch_iterator:将结果组织成生成器Generator传入table_writer中,生成器Generator调用next方法后生成的结果为pyarrow batch数据类型。

另外,我们可以添加装饰器udtf来显示定义表函数类型,用以得到更好的性能,但这并不会对表函数处理过程进行检查。

def udtf(is_lookup=False, is_parser=False, not_producing_multi_rows=False):
"""
@param is_parser: `is_parser=True` indicates this table function parses its argument and produces key value pairs as a new row. It should NOT write more than 1 row for each input if specified. Specifying it to be True helps performance.
@param is_lookup: `is_lookup=True` indicates this table function uses its argument as lookup key and produces key value pairs as a new row. It should NOT write more than 1 row for each input if specified. Specifying it to be True helps performance.
@param not_producing_multi_rows: `not_producing_multi_rows=True` indicates this table function produces no more than one row. It should NOT write more than 1 row for each input if specified.
"""

def decorate_with_udtf(func_class):
func_class.is_parser = is_parser
func_class.is_lookup = is_lookup
func_class.not_producing_multi_rows = is_lookup or is_parser or not_producing_multi_rows
return func_class

return decorate_with_udtf

装饰器udtf提供了三个参数来表述表函数的类型:

  • is_lookup
  • is_parser
  • not_producing_multi_rows

这三个参数意在指定表函数对于任意输入参数列表process处理后,只生成不多于一行的结果。当表函数被指定这样的装饰器后,在Apply操作中,平台将节省Apply原始表的处理过程,可以假定原始表原样返回,从而提升性能。

Eggfarm

本章节介绍用于辅助开发Python表函数的工具Eggfarm。

基本使用

  • 安装
    • pip install eggfarm
    • eggfarm已经发布到公共环境
    • 安装eggfarm后会同时安装eggfarm命令
  • 使用
    • eggfarm new add_two
    • 这里add_two为表函数的名称,需要注意的是,这个名称需要和后续注册到平台中的名称保持一致

生成表函数的结构展示如下:

add_two/
├── add_two
│ ├── __init__.py
│ ├── info.toml
│ └── version.py
├── pyproject.toml
├── tests
│ ├── func_test.py
│ └── supported_signature_list.py
├── Makefile
└── README.md

进入add_two目录下执行make install && make test && make package就可以得到最简单的add_two函数打包结果,可直接上传到平台文件管理页面,然后注册使用。

那么这些文件具体代表什么呢?

pyproject.toml

Eggfarms生成的目录是基于poetry开发环境的,带有pyproject.toml文件:

[tool.poetry]
name = "add_two"
version = "0.1.0"
description = "add_two table function"
authors = []

[tool.poetry.dependencies]
python = "^3.9"

[tool.poetry.dev-dependencies]
pytest = "^6.0.1"
# depends on the udtfs package
stonewave-sql-udtfs = "^0.6.0"
toml = "^0.10.2"

[build-system]
requires = ["poetry-core>=1.0.0"]
build-backend = "poetry.core.masonry.api"

pyproject.toml需要用户加入表函数所有用到的依赖,这里已经默认将stonewave-sql-udtfstoml依赖加入到tool.poetry.dev-dependencies中,stonewave-sql-udtfs包含了所有表函数基本API,toml用于读取表函数签名。放在tool.poetry.dev-dependencies中的依赖库将不会打包到最终的结果中,可以放测试相关的应用库。对于函数实际执行中需要用到的依赖请放到tool.poetry.dependencies中,最后需要被打包到结果中。

__init__.py

Eggfarm生成了基于创建时的函数名称的表函数接口,继承自BaseFunction:

from stonewave.sql.udtfs.base_function import BaseFunction, udtf
from stonewave.sql.udtfs.logger import logger
import pyarrow


# TODO: change `is_parser` to True if your table function parses an event and extracts fields from the event.
# Each event parsing SHOULD produce exactly one event.
# by specifying `is_parser` property, the performance will be better
@udtf(is_parser=False)
class AddTwoFunction(BaseFunction):
def get_name(self):
"""
:return: the name of the table function
"""
return "add_two"

# ...

def process(self, params, table_writer, context):

"""
# method 1:
# using kv pairs to append row, kv pairs means column name and column value
# all results are appended in string datatype
num1 = params[0]
num2 = params[1]
table_writer.write_row({"add_result": num1+num2 })
# ===> | add_result |
# | num1+num2 |

"""
# method 2 (Advanced):
# using write_column to add a column using pyarrow datatype
# using extend to append multiple value for column
num1 = params[0]
num2 = params[1]
table_writer.write_column("add_result", pyarrow.int64(), [int(num1) + int(num2)])
"""

这里process生成了简单代码,示例table_writer的使用。所有日志请使用from stonewave.sql.udtfs.logger import logger作为统一的日志收集接口。

info.toml

# TODO: change the function sigature according to your needs
# to support multiple signatures, you can add more signatures to this list
# This file is for stonewave table function registration.
# it is not supported to have multiple signatures with the same number of parameters
# e.g. signature_list = [ [ "INT", "STRING", "INT",], [ "INT", "STRING", "INT", "BOOL",], [ "INT", "STRING", "INT", "BOOL", "FLOAT",],]
# current valid data types are: [STRING | INT | BOOL | FLOAT | TABLE]
signature_list = [ ["INT", "INT"] ]

注册函数包到炎凰数据平台时,需要指定函数的参数列表,info.toml便承担了这部分工作。很多时候,函数参数列表含有参数默认值,SQL在表达参数默认值时不直接,这里info.toml提供参数列表的列表signature_list,来表达含有参数默认值的情况。signature_list需要列出全部可能的参数列表。当函数没有默认值时,signature_list即为单一列表元素的列表。

tests

tests目录包含了相应的简单的测试。

  • supported_signature_list.py文件无需更改,主要用于测试中对参数的检查
  • func_test.py包含简单的测试

基于上述介绍,开发一个简单的表函数,只需要更改process方法,以及填写函数签名signature_list即可完成。但值得注意的是,打包上传的依赖需要是linux可安装的包,所以make package将下载对应依赖的linux包,这可能和你的开发环境依赖的包不一致。

针对于arm64环境的python udtf开发,在最新版的eggfarm 中支持使用make package_arm64下载arm64开发依赖包。

注册

  • 首先我们需要将Eggfarm打包的函数包上传至文件管理
caution
  • 应当上传格式为 <function_name>_table_func_<creation_datetime>.tar.gz的文件
  • 上传文件时须关闭解压
  • 在查询页面创建删除自定义Python表函数,语法支持如下:
    CREATE [OR REPLACE] FUNCTION <function_name>
    LANGUAGE PYTHON
    PACKAGE '<package_path>'

    DROP FUNCTION <function_name>
    LANGUAGE PYTHON
  • package_path是上传到文件管理的相对文件路径,当前即文件名
  • ⚠️ 注册python table function时会重启python table function server,在其他的python table function正在执行时会注册失败,需要等待其他的python table function执行完成后才能成功执行注册。

表函数process示例

抽取函数(write_row)

from pygrok import Grok

@udtf(is_parser=True)
class ParseGrokFunction(BaseFunction):
def get_name(self):
return "parse_grok"

def process(self, params, table_writer, context):
text = params[0]
grok_pattern = params[1]
grok = Grok(grok_pattern)
extracted_values = grok.match(text)

if extracted_values:
table_writer.write_row(extracted_values)

使用示例

SELECT ip FROM parse_grok('ip is 127.0.0.1', '%{IPV4:ip}')
ip
127.0.0.1

数据生成函数(write_column)

from faker import Faker
import pyarrow
from stonewave.sql.udtfs.logger import logger

fake = Faker()
# add faker provider

class FakerFunction(BaseFunction):
def get_name(self):
return "faker"

def _safe_cast(self, value, to_type, default=None):
try:
return to_type(value)
except (ValueError, TypeError):
return default

def process(self, params, table_writer, context):
row_count = max(0, self._safe_cast(params[0], int, 0))
if params[1] is None:
logger.debug("faker column name is not provided, no row is produced")
return
column_names = params[1].split(",")

for column in columns:
field_faker = getattr(fake, column, None)
field_type = _get_data_type(field_faker)
array = [field_faker() if field_faker else None for i in range(row_count)]
table_writer.write_column(column, field_type, array)
logger.debug(
"rows generated via faker",
row_count=row_count,
)

# we need use pyarrow datatype
def _get_data_type(field_faker):
if not field_faker:
return pyarrow.utf8()
else:
value = field_faker()
# bool needs to be placed before int because isinstance(True, int) == True
if isinstance(value, bool):
return pyarrow.bool_()
elif isinstance(value, int):
return pyarrow.int64()
elif isinstance(value, float):
return pyarrow.float64()
else:
return pyarrow.utf8()

使用示例

select * from faker(10, 'name, email')
nameemail
Steven Salazarldavis@gmail.com
Lydia Sanchezjohn86@hotmail.com
Brian Tuckeropatterson@hatfield-kelly.com
Donna Williamsnolansamuel@gilmore-sandoval.com
Christopher Solomonashleyjordan@green.com
Jonathan Andersonsarah09@yahoo.com
Tina Berrygregoryharris@brown.info
Jacob Lopezharrismichael@yahoo.com
Amanda Richardsonnmarshall@sanchez-lee.com

读取文件函数(table_batch_iterator)

import pyarrow as pa
import pandas as pd
from stonewave.sql.udtfs.logger import logger
from stonewave.sql.udtfs.base_function import BaseFunction
from stonewave.sql.udtfs.constants import STONEWAVE_HOME

class LoadExcelFunction(BaseFunction):
def __init__(self):
# uploaded files will be placed in
# ${STONEWAVE_HOME}/var/external_data
self.external_data_dir =
os.path.join(STONEWAVE_HOME, "var", "external_data")

def get_name(self):
return "load_excel"

def process(self, params, table_writer, context):
excel_file = params[0]
logger.debug(
"executing load_excel table function",
excel_file=excel_file,
)
try:
self.set_path(excel_file)
excel_file_path = self.get_path()

excel_data = pd.read_excel(
excel_file_path,
engine="openpyxl",
dtype=str,
)

sheets = []
for sheet_name, df in excel_data.items():
# trim spaces in column names
df.columns = df.columns.str.strip()
table = pa.Table.from_pandas(df, preserve_index=False)
batches = table.to_batches()
if batches:
sheets.append(batches[0])
table_writer.table_batch_iterator = iter(sheets)
except Exception as e:
logger.error("failed to load excel file", error=str(e))

def set_path(self, path):
logger.debug(
"executing set_path in base dataloader",
path=path,
external_data_dir=self.external_data_dir,
)
# ... check path
self.path = data_path

def get_path(self):
return self.path

使用示例

select * from load_excel('档案编号.xlsx')
户管档案编号
201102
201104
201103
201105
201201
201202
201203
201204
201205

表值函数

import pyarrow as pa

class SummarizeFunction(BaseFunction):
def __init__(self):
self.tables = []

def get_name(self):
return "summarize"

def process(self, params, table_writer, context):
assert len(params) > 0
# table valued parameter must be placed in first place
# python process will process a streaming of table batches
# summarize function needs to combine all batches to calculate results
batch = params[0]
if batch is not None:
table = pa.Table.from_batches([batch])
self.tables.append(table)
# when first parameter is None, means batch streaming is ended
else:
# when get all batch of input table, calculate summarize stats
self.summarize(table_writer)

def summarize(self, table_writer):
if self.tables:
table = pa.concat_tables(self.tables, promote=True)
df = table.to_pandas()
desc_df = df.describe(datetime_is_numeric=True)
# the df is transposed because it contains mixed type column, which is not allowed in arrow
desc_df = desc_df.transpose()
desc_df.insert(0, "fields", desc_df.index)
desc_table = pa.Table.from_pandas(desc_df, preserve_index=False)
batches = desc_table.to_batches()
table_writer.table_batch_iterator = iter(batches)
else:
return

使用示例

with cte as (select * from generate_series(1, 10))
select * from summarize(cte)
fieldscountmeanstdmin25%50%75%max
generate_series105.53.027650354097491713.255.57.7510

常见问题

1. make package失败

打包上传的文件需要下载所有直接依赖库,可以是的平台不需要联网便可安装新的表函数。然而平台运行在linux环境下,所以make package会下载所有依赖的linux平台包,需要确认开发时环境所有的依赖包及对应的版本是否在linux平台也存在。

2. 上传注册成功后,执行出错

在函数测试正确的情况下,执行出错,那么需要重启平台后台查询服务(Search Broker Pod),以便加载新的函数包。