138 lines
3.7 KiB
Python
138 lines
3.7 KiB
Python
#!/usr/bin/python3
|
|
|
|
"""
|
|
This serves lexonomy plugins
|
|
This almost just returns static files,
|
|
it only does search-and-replace for
|
|
$LOCATION$ -> location of plugin
|
|
every plugin is its own folder and if folder is
|
|
"myplugin" and this website's url is "http://example.com",
|
|
then $LOCATION$ gets the value of http://example.com/myplugin
|
|
requirements: flask, redis
|
|
"""
|
|
|
|
import sys
|
|
import os.path
|
|
import mimetypes
|
|
import string
|
|
import random
|
|
|
|
import redis
|
|
from flask import Flask, Response, request
|
|
|
|
app = Flask(__name__)
|
|
redis = redis.Redis(host='localhost', port=6379, db=0)
|
|
|
|
URL = "https://plugins.lexonomy.cjvt.si"
|
|
REPLACE_STRING = "$LOCATION$"
|
|
PLUGINS_DIR = os.environ.get("PLUGINS_DIR_PATH", "plugins")
|
|
|
|
|
|
def check_cache(full_path):
|
|
file_time = int(os.path.getmtime(full_path))
|
|
old_file_time = 0
|
|
status = False
|
|
|
|
if redis.exists(full_path + ":date"):
|
|
try:
|
|
old_file_time = int(redis.get(full_path + ":date"))
|
|
status = old_file_time == file_time
|
|
except ValueError:
|
|
pass
|
|
|
|
redis.set(full_path + ":date", file_time)
|
|
return status
|
|
|
|
|
|
def generate_etag(N=12):
|
|
return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(N))
|
|
|
|
|
|
def return_file(path, plugin=None):
|
|
full_path = path if plugin is None else"{}/{}".format(plugin, path)
|
|
full_path = f"{PLUGINS_DIR}/{full_path}"
|
|
|
|
if not os.path.isfile(full_path):
|
|
return "File not found", 404
|
|
|
|
mt, _encoding = mimetypes.guess_type(path)
|
|
|
|
# if in cache, load etag and content, set status code to 304
|
|
if check_cache(full_path):
|
|
result = redis.get(full_path + ":content")
|
|
etag = str(redis.get(full_path + ":etag"))
|
|
status_code = 304
|
|
|
|
# else, make search-and-replace and save to redis (+etag)
|
|
else:
|
|
status_code = 200
|
|
etag = generate_etag()
|
|
|
|
with open(full_path, 'rb') as fp:
|
|
result = fp.read()
|
|
with open(".err", "a") as fpw:
|
|
print(result, file=fpw)
|
|
print(len(result), file=fpw)
|
|
|
|
if plugin is not None:
|
|
replace_with = "{}/{}".format(URL, plugin)
|
|
try:
|
|
content = result.decode('UTF-8')
|
|
content_replaced = content.replace(REPLACE_STRING, replace_with)
|
|
result = content_replaced.encode('UTF-8')
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
redis.set(full_path + ":content", result)
|
|
redis.set(full_path + ":etag", etag)
|
|
|
|
# set headers
|
|
headers = {
|
|
'Access-Control-Allow-Origin': '*',
|
|
'ETag': etag
|
|
}
|
|
|
|
# if browser does not have a cached etag -> 200
|
|
if 'If-None-Match' not in request.headers:
|
|
status_code = 200
|
|
# then check if it etag is None -> 200
|
|
elif request.headers['If-None-Match'] == "None":
|
|
status_code = 200
|
|
# then check if it is the latest etag, else -> 200
|
|
elif request.headers['If-None-Match'] != etag:
|
|
status_code = 200
|
|
|
|
status_code = 200
|
|
return Response(result, mimetype=mt, headers=headers, status=status_code)
|
|
|
|
|
|
@app.route("/")
|
|
def root():
|
|
result = "<html><body><h2>Description</h2><pre>"
|
|
result += sys.modules[__name__].__doc__
|
|
result += "</pre><h2>Url</h2><p>"
|
|
result += URL
|
|
result += "</p></body></html>"
|
|
return result
|
|
|
|
|
|
@app.route("/plugin-loader.js")
|
|
def plugin_loader():
|
|
return return_file("./plugin-loader.js")
|
|
|
|
|
|
@app.route("/<plugin>")
|
|
def plugin_root(plugin):
|
|
status = "" if os.path.isdir(plugin) else " NOT"
|
|
return "Plugin {} was{} found".format(plugin, status)
|
|
|
|
|
|
@app.route("/<plugin>/<path:path>")
|
|
def plugin_file(plugin, path):
|
|
return return_file(path, plugin=plugin)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
app.run(host="0.0.0.0")
|
|
|