jupyter自定义魔术命令

jupyter自定义魔术命令
AllenTT本文以%%sparksql为例,使得可以在jupyter上直接写sql语句来实现spark-sql查询(依赖spark thrift server)
启动spark thrift server服务依赖
1 | # 默认端口为10000,但会和hive2默认端口冲突,因此改为10001 |
安装pyhive依赖
为了调用上述的thrift服务,我们使用pyhive做为客户端来连接,提交sql语句并获取结果。因此先安装好pyhive的相关python依赖
系统依赖
1 | # 对于CentOS/RHEL 系统 |
python依赖
1 | # 安装下面的python包,需要依赖g++等编译工具,因此需要先在系统层面安装好上述的系统工具 |
自定义魔术命令
上述的各种依赖,都是为了能执行sparksql命令所需要的,其它自定义魔术命令非必需
在jupyter的notebook中执行,注册魔术命令
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29from IPython import get_ipython
from IPython.core import magic_arguments
from IPython.core.magic import (Magics, magics_class, line_magic, cell_magic, line_cell_magic)
class SparkSql(Magics):
def sparksql(self, line, cell):
try: from pyhive import hive # pip install pyhive pandas thrift sasl thrift-sasl
except Exception as e: print('如要使用sparksql魔术命令,请先安装pyhive,命令为:%pip install pyhive pandas thrift sasl thrift-sasl'); raise e
try: from sqlalchemy import create_engine # pip install sqlalchemy
except Exception as e: print('如要使用sparksql魔术命令,请先安装sqlalchemy,命令为:%pip install sqlalchemy'); raise e
try: import pandas as pd
except Exception as e: print('如要使用sparksql魔术命令,请先安装pandas,命令为:%pip install pandas'); raise e
conn_str = 'hive://hive@localhost:10001/default'
try:
engine = create_engine(conn_str)
except Exception as e:
print(f'无法连接{conn_str}')
raise e
# 'select * from wl_rec_sea.dwd_sea_square_post_days_effect_stats_di where dt="2025-08-01" limit 10'
sql_str = cell if 'limit' in cell or 'select' not in cell.lower() else (cell.strip().rstrip(';') + '\n' + ' limit 10')
pdf = pd.read_sql_query(sql_str, engine); pd.options.display.max_rows = 100; pd.options.display.max_columns = 100
return pdf
ipy = get_ipython()
ipy.register_magics(SparkSql)验证是否注册成功
1
%lsmagic
用魔术命令执行sql语句
1
2%%sparksql
show tables
1 | %%sparksql |
持久化自定义魔术命令
- 将上面的魔术命令定义的脚本添加到
~/.ipython/profile_default/startup/中,将其命名为py脚本即可,如01-sparksql.py - 重启jupyter即可永久生效




