基于Pam-Python模块Linux SSH双因子认证的实际应用
前 言
本文记录了笔者在公司项目中需要搭建Linux系统SSH双因子认证系统过程中的采用的开源技术方案。这里并不深入讨论什么是pam-python模块,因为互联网上已经有不少前辈介绍了这个用python语言开发的Linux PAM模块,可参考:
Pam-Python实现SSH的短信双因素认证:https://www.freebuf.com/articles/web/165139.html
Linux下使用pam_python实现SSH的双因子认证登录:http://www.ipcpu.com/2016/04/linux-pam-python/
本文的重点在于介绍下采用Pam-Python模块进行简单的双因子认证的开发,谨此抛砖引玉,欢迎大家发散想法并灵活运用到自己的业务系统中。
一、什么是PAM
PAM是Pluggable Authentication Module的简称,即“可插拔认证模块”。它是Linux系统下通用的灵活运用的用户认证模块,最早由SUN 的研究人员于 1995 年提出,并在Solaris 2.3 系统上设计并部分实现。关于PAM更详细的解释,读者可以参考《鸟哥的Linux私房菜》一书中的相关章节(笔者就是最早从本书中了解到PAM的)。当然,网络上也有很多相关的高质量文章,这里不作深入介绍,可参考相关文章:
深入 Linux PAM 体系结构:https://www.ibm.com/developerworks/cn/linux/l-cn-pam/
Linux 可插拔认证模块的基本概念与架构:https://www.infoq.cn/article/wjl-linux-pluggable-authentication-module
通过PAM进行身份认证: https://www.suse.com/zh-cn/documentation/sles10/book_sle_reference/data/cha.pam.html
二、Pam-Python模块
Pam-Python是一款开源的python模块,它将需要使用C语言编写的PAM模块转换成了可以使用python语言编写,大大降低了开发难度,并且已经将认证流程python函数化,开发者只需在框架内编写少量业务代码即可调用PAM模块进行用户认证。关于Pam-Python模块详细介绍及其使用可参考其官网:
Pam-Python官网:http://pam-python.sourceforge.net/
关于Pam-Python模块的安装、配置和使用,可参考上面的文章链接《Pam-Python实现SSH的短信双因素认证》 ,在下文的业务场景中,笔者也以自己实际开发的认证模块做简单运用说明。
三、业务场景及应用
(a)SSH登录二次认证(运维、自研堡垒机)
1.安装编译依赖:
yum install pam pam-devel -y
2.下载/编译/安装pam_python模块:
下载地址:http://pam-python.sourceforge.net/ 编译:make lib 安装:cp build/lib.linux-x86_64-2.6/pam_python.so /lib64/security/
3.添加ssh_auth.py认证脚本/开启pam认证:
cp ssh_auth.py /lib64/security/ 修改/etc/pam.d/sshd,新增一行: auth requisite pam_python.so auth.py
4.sshd开启pam认证:
修改/etc/ssh/sshd_config,ChallengeResponse参数: ChallengeResponseAuthentication yes 重启SSHD服务, 测试, 如果出现错误,日志会写到/var/log/secure里面.
5.编写Pam-Python脚本ssh_auth.py :
# -*- coding=utf-8 -*- # author: s0nnet def pam_sm_authenticate(pamh, flags, argv): user = pamh.get_user() if user in ["root"]: return pamh.PAM_SUCCESS for attempt in range(0, 2): msg = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "Enter Your PIN: ") resp = pamh.conversation(msg) if resp.resp == "heheda": return pamh.PAM_SUCCESS else: continue return pamh.PAM_AUTH_ERR def pam_sm_setcred(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_acct_mgmt(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_open_session(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_close_session(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_chauthtok(pamh, flags, argv): return pamh.PAM_SUCCESS
在这个简单的脚本中,这里仅进行简单PIN码二次认证,PIN输入错误时将无法登录,当PIN正确后会继续要求输入用户密码:
至此,一个简单的ssh登录二次认证服务器就完成了。基于Pam-Python模块的SSH二次认证模块特别适用于运维管理系统,也可运用在自研的运管系统、堡垒机等系统中。从这个简单的脚本可以看出,所有的业务逻辑只需要编写pam_sm_authenticate函数即可。所以,笔者也是大开脑洞,结合其他模块就可以实现下面这些满足其他场景的业务系统。
(2)结合Google Authenticator二次认证(随机、移动端认证)
关于Linux下使用Google Authenticator配置SSH动态验证码登录已经有了成熟开源的库,用于只需安装并简单配置即可。这里不同之处是,采用Pam-Python模块并结合双因子认证(2FA)TOTP算法的python库pyotp编程开发实现。
关于什么是双因素认证,可以参考阮一峰的双因素认证(2FA)教程:http://www.ruanyifeng.com/blog/2017/11/2fa-tutorial.html
这里不再赘述双因素认证的原理和TOTP算法的实现,仅仅简述使用pyotp库结合Google Authenticator的方法:
服务端:
- 安装
pyotp
,pip install pyotp
- 为每个用户生成一个 base32 的密钥
base32secret3232
,保存在服务器数据库中 , 代码为pyotp.random_base32()
- 服务端在匹配时使用
totp = pyotp.TOTP('base32secret3232')
和
totp.now()
即可获得当前动态口令
客户端:
客户端使用 Google Authenticator 扫描二维码即可录入信息,二维码的内容可以在服务器生成 pyotp.totp.TOTP('base32secret3232').provisioning_uri("root@s0nnet.com", issuer_name="s0nnet Blog")
base32secret3232
为之前生成的 base32 的密钥root@s0nnet.com
为用户的用户名,可以显示在客户端上s0nnet Blog
为 APP/网站 的名字
之后 Google Authenticator 会每隔 30s 就更新一次动态口令,在需要二次认证的时候传入服务器做对比。
(3)公共FTP服务器(随机、审计、流程)
这里的公共ftp访问服务器使用的业务场景是:在企业内部,售前/售后支持部门在客户现场需要远程访问公司内部资源(如更新的业务组件、程序等),但是又需要考虑到安全访问,权限管理和事件审计,在这种存在一个标准流程管理的场景下,使用Pam-Python开发的认证系统就可以保证内部员工使用受限的密码远程访问内部资源。大体的Pam-Python模块的业务与上面的“SSH登录二次认证”类型,其认证数据库采用sqlite3,同时开发一个简单的web管理系统,实现流程的管理,用户权限的限时等。
编写Pam-Python脚本ssh_auth.py示例:
# -*- coding=utf-8 -*- import datetime import sqlite3 def check_user_auth(username, userpin): conn = sqlite3.connect('/root/ms_ssh/ms_ssh.db') c = conn.cursor() sql = "SELECT pin,time_use from ssh_user where username='%s' and status=1 limit 1" % username cursor = c.execute(sql) pin = None time_use = None for row in cursor: pin = row[0] time_use = row[1] if not pin or not time_use: return False curTm = (datetime.datetime.now()).strftime("%Y-%m-%d %H:%M:%S") timeUse = time_use.split("--") if timeUse[0] <= curTm <= timeUse[1]: if userpin == pin: return True return False def pam_sm_authenticate(pamh, flags, argv): user = pamh.get_user() if user in ["root"]: return pamh.PAM_SUCCESS for attempt in range(0, 2): msg = pamh.Message(pamh.PAM_PROMPT_ECHO_OFF, "Enter Your PIN: ") resp = pamh.conversation(msg) auth_result = check_user_auth(user, resp.resp) if auth_result: return pamh.PAM_SUCCESS else: continue return pamh.PAM_AUTH_ERR def pam_sm_setcred(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_acct_mgmt(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_open_session(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_close_session(pamh, flags, argv): return pamh.PAM_SUCCESS def pam_sm_chauthtok(pamh, flags, argv): return pamh.PAM_SUCCESS
采用sqlite3数据库,示例如下:
## 默认ms_ssh.db用户表ssh_user: id|username|passwd|priv|pin|status|time_use 1|test_007|ms#$H7K5M2|1|g3D5k2&g9N4#s0H|1|2018-10-10 09:12:23--2018-10-12 22:24:41 2|test_008|ms#&N9KgK3|1|d53hk2G9N4VB95F|0| 3|test_009|ms#&H3KUK2|1|H35kDXG&N4tB9ea|0| 关键字段说明: passwd: username对应的密码; priv: 该账号对应的等级,暂未使用上; pin: ssh登录的PIN认证码; status: 该账号目前的状态 0:空间中; 1:占用中; time_use: 该账号可允许登入的时间段,在status=1是才算可用.
web流程管理系统这里忽略,其主要任务就是,完成员工申请、随机生成账户以及访问资源的限时,超时后的清理和访问日志等功能。
(4)钉钉群双因子认证(公开、审计)
通过上面的例子可以发现,大部分的工作就是编写ssh_auth.py模块即可。所以这里的钉钉群动态码其实也就是在ssh_auth.py中推送动态PIN码到钉钉群并@指定的用户。当然,使用钉钉群,其他用户也会看到动态PIN码,但是每个用户的密码是相互独立且保密的。使用钉钉群的目的就是方便公开审计。这里仅给出推送钉钉群的相关代码片段,更佳应用场景可以继续关注笔者以后关于中小型企业基于钉钉群二次认证的VPN方案的技术文章。
推送钉钉群的相关代码片段:
def push_msg(user, host, desc, pin, at_list=list()): PUSH_URL = "https://oapi.dingtalk.com/robot/send?access_token=xxxxxxxxxxxxxxxxxxxxxx" def _push(url, data, headers, check_fun=None): if check_fun is not None: check_fun(data) r = requests.post(url, data=data, headers=headers) resp = r.json() return resp data = dict(msgtype='text', text={}, at={}) data['text']['content'] = u"用户%s正在登录%s,动态PIN为【%s】,10分钟内有效。如非本人操作请及时联系管理员。" % (user, host, pin) if at_list: data['at']['atMobiles'] = at_list data = json.dumps(data) headers = {'Content-Type': u'application/json'} print _push(PUSH_URL, data=data, headers=headers)