-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathapp.py
executable file
·177 lines (125 loc) · 4.31 KB
/
app.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
from flask import (
Flask,
render_template,
send_file,
url_for,
redirect,
abort,
request,
json
)
import flask_login
from werkzeug.utils import secure_filename
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
from utils import machine_local_ip, storage_init
import os
'''
This function will make a storage directory if not exists
You can use 'custom_storage_dir' function based in storage_init module to set a custom storage dir
'''
storage_init.init()
ip_address = machine_local_ip.get_local_ipv4()
port_number = 80
username = "admin"
password = "admin"
secret = "MohsenFoolad"
files_dir = storage_init.init()[1]
files_dir_name = storage_init.init()[0]
login_manager = flask_login.LoginManager()
app = Flask(__name__)
app.secret_key = secret
login_manager.init_app(app)
users = {username : {'password' : password}}
print(app.config['MAX_CONTENT_LENGTH'])
limiter = Limiter(
get_remote_address,
app=app,
default_limits=["484 per day", "48 per hour"],
storage_uri="memory://",
)
class User(flask_login.UserMixin):
pass
@login_manager.user_loader
def user_loader(email):
if email not in users:
return
user = User()
user.id = email
return user
@login_manager.request_loader
def request_loader(request):
email = request.form.get('email')
if email not in users:
return
user = User()
user.id = email
return user
@app.route('/login', methods=['GET', 'POST'])
@limiter.limit("5 per minute")
def login():
if request.method == 'GET':
return render_template("login.html")
email = request.form['email']
if email in users and request.form['password'] == users[email]['password']:
user = User()
user.id = email
flask_login.login_user(user)
return render_template('upload.html', logged_in = True, ip_address=ip_address,port_number=port_number, files_dir_name=files_dir_name)
return render_template("login.html", error = "Invalid Data !")
@app.route('/protected')
@flask_login.login_required
def protected():
return render_template("index_files.html")
@app.route('/logout')
def logout():
flask_login.logout_user()
return render_template("upload.html", ip_address=ip_address,port_number=port_number, message = "Logged out.")
@login_manager.unauthorized_handler
def unauthorized_handler():
return render_template("unauth.html")
@app.route("/")
@app.route("/up")
def upload_func():
if flask_login.current_user.is_authenticated == False:
return render_template("upload.html", ip_address=ip_address,port_number=port_number)
else:
return render_template("upload.html", ip_address=ip_address,port_number=port_number, logged_in = True, )
@app.route("/uploader", methods = ["GET","POST"])
def uploader():
if request.method == "POST":
if 'file' not in request.files:
return render_template("upload.html", message="No selected file")
file = request.files["file"]
if file.filename == "":
return render_template("upload.html", message="No selected file")
if file:
try:
print("Started saving file ... ")
# Use a streaming approach to save the file in chunks
file_path = os.path.join(files_dir, file.filename)
with open(file_path, "wb") as f:
while True:
chunk = file.stream.read(1024) # Read 1KB at a time
if not chunk:
break
f.write(chunk)
print("File saved successfully.")
except Exception as e:
return render_template("upload.html", message=f"Error: {str(e)}")
return render_template("up_done.html")
@app.route("/<path:req_path>")
@flask_login.login_required
def index_files_func(req_path):
base_dir = "../"
abs_path = os.path.join(base_dir, req_path)
print(abs_path)
if not os.path.exists(abs_path):
return f"{abs_path}"
if os.path.isfile(abs_path):
return send_file(abs_path)
files = os.listdir(abs_path)
print(files)
return render_template("index_files.html", files=files)
if __name__ == "__main__":
app.run(host=ip_address, port=port_number, debug=True)