138 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			138 lines
		
	
	
		
			3.7 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| #!/usr/bin/python3
 | |
| 
 | |
| """
 | |
| This serves lexonomy plugins
 | |
| This almost just returns static files, 
 | |
| it only does search-and-replace for
 | |
| $LOCATION$ -> location of plugin
 | |
| every plugin is its own folder and if folder is
 | |
| "myplugin" and this website's url is "http://example.com",
 | |
| then $LOCATION$ gets the value of http://example.com/myplugin
 | |
| requirements: flask, redis
 | |
| """
 | |
| 
 | |
| import sys
 | |
| import os.path
 | |
| import mimetypes
 | |
| import string
 | |
| import random
 | |
| 
 | |
| import redis
 | |
| from flask import Flask, Response, request
 | |
| 
 | |
| app = Flask(__name__)
 | |
| redis = redis.Redis(host='localhost', port=6379, db=0)
 | |
| 
 | |
| URL = "https://plugins.lexonomy.cjvt.si"
 | |
| REPLACE_STRING = "$LOCATION$"
 | |
| PLUGINS_DIR = os.environ.get("PLUGINS_DIR_PATH", "plugins")
 | |
| 
 | |
| 
 | |
| def check_cache(full_path):
 | |
|     file_time = int(os.path.getmtime(full_path))
 | |
|     old_file_time = 0
 | |
|     status = False
 | |
| 
 | |
|     if redis.exists(full_path + ":date"):
 | |
|         try:
 | |
|             old_file_time = int(redis.get(full_path + ":date"))
 | |
|             status = old_file_time == file_time
 | |
|         except ValueError:
 | |
|             pass
 | |
| 
 | |
|     redis.set(full_path + ":date", file_time)
 | |
|     return status
 | |
| 
 | |
| 
 | |
| def generate_etag(N=12):
 | |
|     return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(N))
 | |
| 
 | |
| 
 | |
| def return_file(path, plugin=None):
 | |
|     full_path = path if plugin is None else"{}/{}".format(plugin, path)
 | |
|     full_path = f"{PLUGINS_DIR}/{full_path}"
 | |
| 
 | |
|     if not os.path.isfile(full_path):
 | |
|         return "File not found", 404
 | |
| 
 | |
|     mt, _encoding = mimetypes.guess_type(path)
 | |
| 
 | |
|     # if in cache, load etag and content, set status code to 304
 | |
|     if check_cache(full_path):
 | |
|         result = redis.get(full_path + ":content")
 | |
|         etag = str(redis.get(full_path + ":etag"))
 | |
|         status_code = 304
 | |
| 
 | |
|     # else, make search-and-replace and save to redis (+etag)
 | |
|     else:
 | |
|         status_code = 200
 | |
|         etag = generate_etag()
 | |
| 
 | |
|         with open(full_path, 'rb') as fp:
 | |
|             result = fp.read()
 | |
|             with open(".err", "a") as fpw:
 | |
|                 print(result, file=fpw)
 | |
|                 print(len(result), file=fpw)
 | |
| 
 | |
|         if plugin is not None:
 | |
|             replace_with = "{}/{}".format(URL, plugin)
 | |
|             try:
 | |
|                 content = result.decode('UTF-8')
 | |
|                 content_replaced = content.replace(REPLACE_STRING, replace_with)
 | |
|                 result = content_replaced.encode('UTF-8')
 | |
|             except UnicodeDecodeError:
 | |
|                 pass
 | |
| 
 | |
|         redis.set(full_path + ":content", result)
 | |
|         redis.set(full_path + ":etag", etag)
 | |
| 
 | |
|     # set headers
 | |
|     headers = {
 | |
|         'Access-Control-Allow-Origin': '*',
 | |
|         'ETag': etag
 | |
|     }
 | |
| 
 | |
|     # if browser does not have a cached etag -> 200
 | |
|     if 'If-None-Match' not in request.headers:
 | |
|         status_code = 200
 | |
|     # then check if it etag is None -> 200
 | |
|     elif request.headers['If-None-Match'] == "None":
 | |
|         status_code = 200
 | |
|     # then check if it is the latest etag, else -> 200
 | |
|     elif request.headers['If-None-Match'] != etag:
 | |
|         status_code = 200
 | |
| 
 | |
|     status_code = 200
 | |
|     return Response(result, mimetype=mt, headers=headers, status=status_code)
 | |
| 
 | |
| 
 | |
| @app.route("/")
 | |
| def root():
 | |
|     result = "<html><body><h2>Description</h2><pre>"
 | |
|     result += sys.modules[__name__].__doc__
 | |
|     result += "</pre><h2>Url</h2><p>"
 | |
|     result += URL
 | |
|     result += "</p></body></html>"
 | |
|     return result
 | |
| 
 | |
| 
 | |
| @app.route("/plugin-loader.js")
 | |
| def plugin_loader():
 | |
|     return return_file("./plugin-loader.js")
 | |
| 
 | |
| 
 | |
| @app.route("/<plugin>")
 | |
| def plugin_root(plugin):
 | |
|     status = "" if os.path.isdir(plugin) else " NOT"
 | |
|     return "Plugin {} was{} found".format(plugin, status)
 | |
| 
 | |
| 
 | |
| @app.route("/<plugin>/<path:path>")
 | |
| def plugin_file(plugin, path):
 | |
|     return return_file(path, plugin=plugin)
 | |
| 
 | |
| 
 | |
| if __name__ == '__main__':
 | |
|     app.run(host="0.0.0.0")
 | |
| 
 |