Python模块-pandas.DataFrame转Excel格式的bytes数据

Python模块-pandas.DataFrame转Excel格式的bytes数据

背景

pandas is an open source, BSD-licensed library providing high-performance, easy-to-use data structures and data analysis tools for the Python programming language.

数据分析工具包 pandas,是利用 Python 进行数据分析时,一个难以割舍的选项。它的官方文档 pandas/docs 示例非常丰富,而且它的作者 Wes McKinney 另外还写了一本书 Python for Data Analysis,截至目前已经更新到第 3 版,👆点击下载
中国国内也有热心网友做了免费的翻译分享,这是第 2 版,简书-《Python数据分析》2nd
Python for Data Analysis, 3rd Edition

我曾经实现过一个单机部署的 ETL (extract, transform, load) 程序,三个步骤都基于 pandas 实现。不过这个程序最常用的功能,却仅仅是定时读取一批 SQL,然后写入 Excel,最后把这些 Excel 文件作为邮件附件进行发送,真是杀鸡用牛刀😂。

其实,我个人并不太喜欢使用 Excel 文件,先不论 Excel 那缓慢的打开速度,它的一个工作表最多也只能有 1048576(2^20) 行和 16384(2^14) 列,工作表名字最多 31 个字符。所以在我看来,CSV 才是更好的选项。

在那个 ETL 中,只允许从每个源读取一个 pandasDataFrame,但在输出时,可以把多个 DataFrame 输出到一个目标里面。对于一种这类情况,即把多个 DataFrame 输出到同一个 Excel 工作簿,如果这个目标之后再作为一个源时,就不好处理了。如果这是最终的输出,而不是管道的一个中间环节,却是可以接受的。

Excel 最大的问题是工作表的规模有限,如果你的表格的规格超出 1048576×16384,也就是这个矩形不能把你数据表格完全盖住,你就得对 DataFrame 进行拆分。一般来说,我个人是不推荐做拆分的。我的建议是,一个工作簿,只开一个工作表,如果一个工作表存储不下,那就用 CSV 或者 hdf5 格式。

代码实现

TIPS 代码的最新版本在 GitHub Gist 中维护
https://gist.github.com/ChenyangGao/6a3f6177ce8da748413ce304156588f9

文件名称是 pandas_to_excel_bytes.pyPython 实现代码如下:

pandas_to_excel_bytes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
#!/usr/bin/env python3
# coding: utf-8

"""这个模块提供了工具函数,可以把 `pandas` 的 `DataFrame` 转换成
xlsx 格式的 Excel 文件的二进制数据
"""

__author__ = "ChenyangGao <https://chenyanggao.github.io/>"
__version__ = (0, 1)
__all__ = ["df_to_excel_bytes", "sql_to_excel_bytes"]

from concurrent.futures import ThreadPoolExecutor
from contextlib import contextmanager
from io import BytesIO
from os import PathLike
from typing import cast, Final, Iterable, IO, TypeVar, Union
from types import MappingProxyType, MethodType
from warnings import warn

# 安装 [pandas-stubs](https://pypi.org/project/pandas-stubs/) 以静态检查
from pandas import read_sql, DataFrame, ExcelWriter, RangeIndex


IO_T = TypeVar("IO_T", bytes, str)

# Excel 的工作表名有一些非法字符 []:*?/\\,需要把出现的非法字符变成合法字符 _
EXCEL_SHEETNAME_INVALID_CHARS_TRANSTABLE: Final[MappingProxyType] = \
MappingProxyType(dict.fromkeys(map(ord, "[]:*?/\\"), "_"))

# xlsx 格式的 Excel 文件,最大的行数
SHEET_MAX_NROWS: Final[int] = 2 ** 20
# xlsx 格式的 Excel 文件,最大的列数
SHEET_MAX_NCOLS: Final[int] = 2 ** 14
# xlsx 格式的 Excel 文件,工作表名最大的字符数
SHEETNAME_MAX_LENGTH: Final[int] = 31


def read_io(fio: IO[IO_T], /) -> IO_T:
"从头读取一个 IO 对象的数据"
pos: int = fio.tell()
try:
fio.seek(0)
return fio.read()
finally:
fio.seek(pos)


@contextmanager
def ctx_df_to_excel_bytes(**kwargs):
"""上下文管理器。
把 `pandas` 的 `DataFrame` 对象转换成 xlsx 格式的 Excel 文件的字节数据

:param kwargs: 关键字参数传给 `pandas` 的 `ExcelWriter` 构造器
> 请参考: pandas.io.Excel._base.ExcelWriter
> https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.ExcelWriter.html
:return: 上下文管理器,返回一个 `pandas` 的 `ExcelWriter` 对象,
但是增加了 3 个属性:
- bio: Excel 数据将被写入这个 `io.Bytes` 对象
- read: 从 `bio` 中读取 Excel 数据
- write: 把 `bio` 中的 Excel 数据写入一个路径

Example::
>>> import pandas as pd
>>> df = pd.DataFrame([[1,2],[3,4]])
>>> with ctx_df_to_excel_bytes() as writer:
... for i in range(10):
... df.to_excel(writer, sheet_name=str(i), index=False)
>>> data = writer.read()
>>> df2 = pd.read_excel(data, sheet_name="5")
>>> (df == df2).all(None)
True
"""
bio = kwargs["path"] = BytesIO()
def read(self) -> bytes:
return read_io(self.bio)
def write(self, path: Union[bytes, int, PathLike]) -> int:
return open(path, "wb").write(read(self))
with ExcelWriter(**kwargs) as writer:
setattr(writer, "bio", bio)
setattr(writer, "read", MethodType(read, writer))
setattr(writer, "write", MethodType(write, writer))
yield writer


def df_to_excel_bytes(
dfs: Union[DataFrame, dict[str, DataFrame], Iterable[DataFrame]]
) -> bytes:
"""把 `pandas` 的 `DataFrame` 对象转换成 xlsx 格式的 Excel 文件的字节数据

:param dfs: 一个或一批 `pandas` 的 `DataFrame` 对象
:return: Excel 文件的字节数据

Example::
>>> import pandas as pd
>>> df = pd.DataFrame([[1,2],[3,4]])
>>> data = df_to_excel_bytes([df]*10)
>>> df2 = pd.read_excel(data, sheet_name="5")
>>> (df == df2).all(None)
True
"""
dfd: dict[str, DataFrame]
if isinstance(dfs, DataFrame):
dfd = {"Sheet1": dfs}
elif isinstance(dfs, dict):
dfd = dfs
else:
dfd = {str(i): df for i, df in enumerate(dfs)}

with ctx_df_to_excel_bytes() as writer:
for old_k, df in dfd.items():
# 把名称 old_k 中的所有非法字符各自转换成下划线 _
k = old_k.translate(EXCEL_SHEETNAME_INVALID_CHARS_TRANSTABLE)
if old_k != k:
warn(f"Illegal characters found in sheetname, convert {old_k!r} to {k!r}")

index, columns = df.index, df.columns
nrows, ncols = len(index), len(columns)
index_nlevel, columns_nlevel = index.nlevels, columns.nlevels
# whether_output_index 判别了是否要在 Excel 中输出 index
# 只有当 index 和 columns 的 level 层数都是 1,并且 index 的开始值 start 为 0 且
# 步进 step 的值是 1,才不需要输出索引。
# TIPS: 如果 columns 的 level 层数大于 1,`pandas` 规定必须输出 index
whether_output_index = not (
columns_nlevel == 1 and
index_nlevel == 1 and
isinstance(index, RangeIndex) and
index.start == 0 and
index.step == 1
)

# rng_row_step: 除去 columns 的输出后,还剩多少行可供数据输出
rng_row_step: int
# rng_col_step: 除去 index 的输出后(也可能不输出),还剩多少列可供数据输出
rng_col_step: int
if whether_output_index:
# 如果 columns 的 level 层数大于 1,则会多输出一空白行
if columns_nlevel > 1:
rng_row_step = SHEET_MAX_NROWS - columns_nlevel - 1
else:
rng_row_step = SHEET_MAX_NROWS - 1
rng_col_step = SHEET_MAX_NCOLS - index_nlevel
else:
rng_row_step = SHEET_MAX_NROWS - 1
rng_col_step = SHEET_MAX_NCOLS
rng_row = range(0, nrows, rng_row_step)
rng_col = range(0, ncols, rng_col_step)

# suffix_template: 如果的行或列过大,导致一张工作表不能输出,则要
# 拆分成几张工作表,为此需要为同一个 DataFrame 输出的多个工作表
# 添加后缀,最后的格式形如({name}表示引用)
# {表名}^{行切分序号}_{列切分序号}
# 假设某个 DataFrame 被切分成了
# sheet^0_0, sheet^0_1, sheet^1_0, sheet^1_0
# 那么数据在原来的 DataFrame 中的分布则是
# |sheet^0_0 | sheet^0_1 |
# |——————————|———————————|
# |sheet^1_0 | sheet^1_0 |
suffix_template = ""
# TIPS: 工作表最多只能有 31 个字符,因此如果添加后缀后,导致和表名的组合多于
# 31 字符,那么就要从表名的尾部去掉一些字符
suffix_len = 0
if len(rng_row) > 1:
q, r = divmod(len(rng_row), 16)
l = q + (r > 0)
suffix_template += "^{0:0%dx}" % l
suffix_len += l + 1
if len(rng_col) > 1:
q, r = divmod(len(rng_col), 16)
l = q + (r > 0)
suffix_template += "_{1:0%dx}" % l
suffix_len += l + 1
if suffix_len > 31:
raise RuntimeError(f"DataFrame is too large: {old_k!r}")

if suffix_len:
for i, row0 in enumerate(rng_row):
for j, col0 in enumerate(rng_col):
suffix = suffix_template.format(i, j)
key = k[:31-suffix_len] + suffix
warn(f"DataFrame is too large, generate sub-sheet: {key!r} (of {old_k!r})")
df.iloc[row0:row0+rng_row_step, col0:col0+rng_col_step].to_excel(
writer, index=whether_output_index, sheet_name=key)
else:
key = k[:31]
if key != k:
warn(f"Sheetname is too long, convert {old_k!r} to {key!r}")
if key == "":
key = "Sheet1"
df.to_excel(writer, index=whether_output_index, sheet_name=key)

return writer.read()


def pandas_read_sqls(
sqls: Union[str, dict[str, str], Iterable[str]],
con,
read_workers: int = -1,
) -> dict[str, DataFrame]:
"""读取一批 SQL 查询语句,每个 SQL 的查询结果都是一个 `pandas` 的 `DataFrame` 对象。

:param sqls: 一个或一批 SQL 查询语句
:param con: SQL 查询的服务器连接
SQLAlchemy connectable, str, or sqlite3 connection
Using SQLAlchemy makes it possible to use any DB supported by that
library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible
for engine disposal and connection closure for the SQLAlchemy connectable; str
connections are closed automatically. See
`here <https://docs.sqlalchemy.org/en/13/core/connections.html>`_.
:param read_workers: 并发的查询数
- 如果小于 0 或等于 1,则并发数为 1,即不并发
- 如果等于 0,则用 ThreadPoolExecutor 默认的并发数
- 如果大于 0,则数值就是并发数

:return: 返回值的内容取决于参数 `sqls`,具体为
if isinstance(sqls, str):
return {"": pandas.read_sql(sqls, con)}
elif isinstance(sqls, dict):
return {k: pandas.read_sql(sql, con) for k, sql in sqls.items()}
else:
return {str(i): pandas.read_sql(sql, con) for i, sql in enumerate(sqls)}
"""
if isinstance(sqls, str):
return {"": read_sql(sqls, con)}
else:
if not isinstance(sqls, dict):
sqls = dict((str(i), sql) for i, sql in enumerate(sqls))
sqls = cast(dict[str, str], sqls)
if read_workers < 0 or read_workers == 1:
return {k: read_sql(sql, con) for k, sql in sqls.items()}
else:
with ThreadPoolExecutor(
None if read_workers == 0 else min(read_workers, len(sqls))
) as worker:
df_iter = worker.map(lambda sql: read_sql(sql, con), sqls)
return {k: df for k, df in zip(sqls, df_iter)}


def sql_to_excel_bytes(
sqls: Union[str, dict[str, str], Iterable[str]],
con,
read_workers: int = -1,
) -> bytes:
"""执行一些 SQL 查询语句,并把查询结果保存到同一个 xlsx 格式的 Excel 文件中,
然后读取这个 Excel 文件并返回字节数据。

:param sqls: 一个或一批 SQL 查询语句,工作表名的确定方式如下:
- 如果 `sql` 是 str,则工作表名为 "Sheet1",`sqls`是相应的 SQL
- 如果 `sql` 是 dict, 则字典的键是工作表名,值是相应的 SQL
> 注意:每个工作表的名字,不允许包含这些字符之一 []:*?/\\ ,
如果包含,则会被自动替换成 _
- 否则 `sql` 就是可迭代对象, 则值是 SQL,序号(从0开始递增)是相应的工作表名
> 注意:每个 SQL 查询返回的数据量,行数尽量 <= 1048575(==2**20-1,第 1 行是表头),
列数最好 <= 16384 (==2**14),否则会被自动拆分
:param con: SQL 查询的服务器连接
SQLAlchemy connectable, str, or sqlite3 connection
Using SQLAlchemy makes it possible to use any DB supported by that
library. If a DBAPI2 object, only sqlite3 is supported. The user is responsible
for engine disposal and connection closure for the SQLAlchemy connectable; str
connections are closed automatically. See
`here <https://docs.sqlalchemy.org/en/13/core/connections.html>`_.
:param read_workers: 并发的查询数
- 如果小于 0 或等于 1,则并发数为 1,即不并发
- 如果等于 0,则用 ThreadPoolExecutor 默认的并发数
- 如果大于 0,则数值就是并发数

:return: Excel 文件数据,bytes 形式

Example::
可以用 sqllite3 进行测试
>>> sql = '''\\
... with recursive range(x) as (
... values(0)
... union all
... select x + 1 from range where x < POW(2, 21)
... )
... select * from range;'''
>>> data = sql_to_excel_bytes([sql]*2, "sqlite:///:memory:")
...
>>> import pandas as pd
>>> df0 = pd.read_excel(data, sheet_name="0^0")
>>> df1 = pd.read_excel(data, sheet_name="0^1")
>>> df2 = pd.read_excel(data, sheet_name="0^2")
>>> df = pd.concat([df0, df1, df2], ignore_index=True)
>>> (df.x == range(2 ** 21 + 1)).all()
True
"""
dfs = pandas_read_sqls(sqls, con, read_workers)
return df_to_excel_bytes(dfs)


if __name__ == "__main__":
import doctest
doctest.testmod()

扩展阅读

1. 把 pandasDataFrame 导出各种编码格式的字符串

如果每个 pandasDataFrame,各自导出到不同的目标,那么可以运用策略模式,实现一个特别简单的统一处理函数,返回导出的 bytesstr

pandas_df_2_string 函数的参数和返回值类型
filetype outtype RETURN TYPE
0 'csv' bytes bytes
1 'csv' str str
2 'csv' None str
3 'excel' bytes bytes
4 'excel' None bytes
5 'feather' bytes bytes
6 'feather' None bytes
7 'hdf' bytes bytes
8 'hdf' None bytes
9 'html' bytes bytes
10 'html' str str
11 'html' None str
12 'json' bytes bytes
13 'json' str str
14 'json' None str
15 'latex' bytes bytes
16 'latex' str str
17 'latex' None str
18 'markdown' bytes bytes
19 'markdown' str str
20 'markdown' None str
21 'parquet' bytes bytes
22 'parquet' None bytes
23 'pickle' bytes bytes
24 'pickle' None bytes
25 'stata' bytes bytes
26 'stata' None bytes
27 'string' bytes bytes
28 'string' str str
29 'string' None str
30 'xml' bytes bytes
31 'xml' str str
32 'xml' None str
pandas_df_2_string.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
#!/usr/bin/env python3
# coding: utf-8

__author__ = "ChenyangGao <https://chenyanggao.github.io/>"
__version__ = (0, 1)
__all__ = ["pandas_df_2_string"]

from io import BytesIO, StringIO
from os import remove
from tempfile import mktemp
from uuid import uuid4


def _to_bytes(write, /, **kwds):
fio = BytesIO()
write(fio, **kwds)
fio.seek(0)
return fio.read()

def _to_str(write, /, **kwds):
fio = StringIO()
write(fio, **kwds)
fio.seek(0)
return fio.read()

def _to_str_bytes(write, /, **kwds):
return _to_str(write, **kwds).encode(encoding="utf-8")

def _call_none(write, /, **kwds):
return write(None, **kwds)

def _to_hdf_bytes(write, /, **kwds):
kwds.setdefault("key", "default")
path = mktemp(str(uuid4()))
try:
write(path, **kwds)
return open(path, "rb").read()
finally:
try:
remove(path)
except:
pass

MAP = {
"csv": {
bytes: _to_bytes,
str: _to_str,
None: _call_none,
},
"excel": {
bytes: _to_bytes,
None: _to_bytes,
},
"feather": {
bytes: _to_bytes,
None: _to_bytes,
},
"hdf": {
bytes: _to_hdf_bytes,
None: _to_hdf_bytes,
},
"html": {
bytes: _to_str_bytes,
str: _to_str,
None: _call_none,
},
"json": {
bytes: _to_bytes,
str: _to_str,
None: _call_none,
},
"latex": {
bytes: _to_str_bytes,
str: _to_str,
None: _call_none,
},
"markdown": {
bytes: _to_str_bytes,
str: _to_str,
None: _call_none,
},
"parquet": {
bytes: _to_bytes,
None: _call_none,
},
"pickle": {
bytes: _to_bytes,
None: _to_bytes,
},
"stata": {
bytes: _to_bytes,
None: _call_none,
},
"string": {
bytes: _to_str_bytes,
str: _to_str,
None: _call_none,
},
"xml": {
bytes: _to_bytes,
str: _to_str,
None: _call_none,
},
}


def pandas_df_2_string(df, filetype="csv", outtype=None, **kwds):
"""把 dataframe 导出为 bytes 或 str

:param df: pandas.DataFrame 对象
:param filetype: 文件格式,可以取值如下:
'csv', 'excel', 'feather', 'hdf', 'html', 'json', 'latex',
'markdown', 'parquet', 'pickle', 'stata', 'string', 'xml'
:param outtype: 输出格式,可以取值如右:bytes, str, None
> 注意:bytes 和 None 都是有效的,但是 str 只对某些 filetype 有效
:param kwds: 调用的方法会落实到 getattr(pandas.DataFrame, 'to_' + filetype)
,而 `kwds` 是传给这个方法的其它关键字参数

:return: 根据 outtype 的值来决定:
- outtype 是 bytes,输出类型 bytes
- outtype 是 str,输出类型 str
- outtype 是 None,输出类型由具体 filetype 决定
"""
submap = MAP.get(filetype)
if submap is None:
raise ValueError(f"Invalid filetype, expected value in {MAP.keys()}, got {filetype!r}")
outm = submap.get(outtype)
if outm is None:
raise ValueError(f"Invalid outtype for filetype {filetype!r}, expected value in {submap.keys()}, got {outtype!r}")
method = getattr(df, "to_" + filetype)
return outm(method, **kwds)

2. 基于pandas实现一个简单的ETL程序

我用的 Pythonpandas 版本如下

1
2
3
4
$ python -V
Python 3.9.12
$ python -c "import pandas;print(pandas.__version__)"
1.4.2

ETLextract 这一步,需要封装所有 pandas.read_* 方法,罗列如下

1
2
3
4
5
6
7
8
9
10
import pandas as pd

df = pd.DataFrame(
[(
"pandas." + f,
pd.__dict__[f].__doc__.split("\n", 2)[1].strip()
) for f in dir(pd) if f.startswith("read_")],
columns=["读入方法", "说明"],
)
print(df.to_markdown())
读入方法 说明
0 pandas.read_clipboard Read text from clipboard and pass to read_csv.
1 pandas.read_csv Read a comma-separated values (csv) file into DataFrame.
2 pandas.read_excel Read an Excel file into a pandas DataFrame.
3 pandas.read_feather Load a feather-format object from the file path.
4 pandas.read_fwf Read a table of fixed-width formatted lines into DataFrame.
5 pandas.read_gbq Load data from Google BigQuery.
6 pandas.read_hdf Read from the store, close it if we opened it.
7 pandas.read_html Read HTML tables into a list of DataFrame objects.
8 pandas.read_json Convert a JSON string to pandas object.
9 pandas.read_orc Load an ORC object from the file path, returning a DataFrame.
10 pandas.read_parquet Load a parquet object from the file path, returning a DataFrame.
11 pandas.read_pickle Load pickled pandas object (or any object) from file.
12 pandas.read_sas Read SAS files stored as either XPORT or SAS7BDAT format files.
13 pandas.read_spss Load an SPSS file from the file path, returning a DataFrame.
14 pandas.read_sql Read SQL query or database table into a DataFrame.
15 pandas.read_sql_query Read SQL query into a DataFrame.
16 pandas.read_sql_table Read SQL database table into a DataFrame.
17 pandas.read_stata Read Stata file into DataFrame.
18 pandas.read_table Read general delimited file into DataFrame.
19 pandas.read_xml Read XML document into a DataFrame object.

而做 ETLload 这一步,需要封装所有 pandas.DataFrame.to_* 方法,罗列如下

1
2
3
4
5
6
7
8
9
10
import pandas as pd

df = pd.DataFrame(
[(
"pandas.DataFrame." + f,
getattr(pd.DataFrame, f).__doc__.split("\n", 2)[1].strip()
) for f in dir(pd.DataFrame) if f.startswith("to_")],
columns=["写出方法", "说明"],
)
print(df.to_markdown())
写出方法 说明
0 pandas.DataFrame.to_clipboard Copy object to the system clipboard.
1 pandas.DataFrame.to_csv Write object to a comma-separated values (csv) file.
2 pandas.DataFrame.to_dict Convert the DataFrame to a dictionary.
3 pandas.DataFrame.to_excel Write object to an Excel sheet.
4 pandas.DataFrame.to_feather Write a DataFrame to the binary Feather format.
5 pandas.DataFrame.to_gbq Write a DataFrame to a Google BigQuery table.
6 pandas.DataFrame.to_hdf Write the contained data to an HDF5 file using HDFStore.
7 pandas.DataFrame.to_html Render a DataFrame as an HTML table.
8 pandas.DataFrame.to_json Convert the object to a JSON string.
9 pandas.DataFrame.to_latex Render object to a LaTeX tabular, longtable, or nested table/tabular.
10 pandas.DataFrame.to_markdown Print DataFrame in Markdown-friendly format.
11 pandas.DataFrame.to_numpy Convert the DataFrame to a NumPy array.
12 pandas.DataFrame.to_parquet Write a DataFrame to the binary parquet format.
13 pandas.DataFrame.to_period Convert DataFrame from DatetimeIndex to PeriodIndex.
14 pandas.DataFrame.to_pickle Pickle (serialize) object to file.
15 pandas.DataFrame.to_records Convert DataFrame to a NumPy record array.
16 pandas.DataFrame.to_sql Write records stored in a DataFrame to a SQL database.
17 pandas.DataFrame.to_stata Export DataFrame object to Stata dta format.
18 pandas.DataFrame.to_string Render a DataFrame to a console-friendly tabular output.
19 pandas.DataFrame.to_timestamp Cast to DatetimeIndex of timestamps, at beginning of period.
20 pandas.DataFrame.to_xarray Return an xarray object from the pandas object.
21 pandas.DataFrame.to_xml Render a DataFrame to an XML document.

ETL 的三个步骤中,最重要的是 T (transform)。我之前的项目,在 transform 这一步,专门开发了一个 DSL(Domain-specific language),会把一种相对简单的语言翻译成 Pythonpandas 操作。但是作为示例来说,这过于复杂了,等下次分享如何开发脚本语言时,我再展开这个例子。
为了简化,我把 ETL 简化成如下这样一个上下文管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
from contextlib import contextmanager
from typing import Any, Callable

from pandas.core.frame import DataFrame

@contextmanager
def ctx_dataframe_as_pipe(
read: Callable[[], DataFrame],
write: Callable[[DataFrame], Any],
):
df = read()
yield df
write(df)

其中 readwrite 在调用函数时传入,请确保传入的参数类型正确,必须是两个可调用对象,如果它们各自还有其它参数,请提前用偏函数 functools.partial 进行捆绑。


未完待续

评论

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×