1
1

run_server.py 14 KB


  1. import datetime
  2. import json
  3. import os
  4. import random
  5. import re
  6. import sys
  7. import time
  8. from json import JSONDecodeError
  9. from logging import INFO
  10. from threading import Thread
  11. from typing import Dict, Any
  12. import bottle
  13. # noinspection PyUnresolvedReferences
  14. from bottle.ext.websocket import GeventWebSocketServer
  15. # noinspection PyUnresolvedReferences
  16. from bottle.ext.websocket import websocket
  17. from gevent import threading
  18. from gevent.queue import Queue, Empty
  19. from gevent.threading import Lock
  20. from geventwebsocket import WebSocketError
  21. from geventwebsocket.websocket import WebSocket
  22. import connection
  23. import model
  24. import server_controller
  25. from application import ROOT_URL, COPYRIGHT_INFRINGEMENT_PROBABILITY, DB_NAME, logger
  26. from connection import HttpError
  27. from debug import debug
  28. from lib.print_exc_plus import print_exc_plus
  29. from lib.threading_timer_decorator import exit_after
  30. from routes import valid_post_routes, upload_filtered
  31. from util import round_to_n, rename, profile_wall_time_instead_if_profiling
  32. FRONTEND_RELATIVE_PATH = '../frontend'
  33. profile_wall_time_instead_if_profiling()
  34. request_lock = Lock() # locked until the response to the request is computed
  35. db_commit_threads = Queue()
  36. if debug:
  37. TIMEOUT = 600
  38. else:
  39. TIMEOUT = 10
  40. assert all(getattr(server_controller, route) for route in valid_post_routes)
  41. def reset_global_variables():
  42. model.current_connection = None
  43. model.current_cursor = None
  44. model.current_db_name = None
  45. model.current_user_id = None
  46. del connection.push_message_queue[:]
  47. bottle.response.status = 500
  48. @exit_after(TIMEOUT)
  49. def call_controller_method_with_timeout(method, json_request: Dict[str, Any]):
  50. return method(json_request)
  51. def _process(path, json_request):
  52. start = time.clock()
  53. path = path.strip().lower()
  54. bottle.response.content_type = 'application/json; charset=latin-1'
  55. reset_global_variables()
  56. original_request = None
  57. # noinspection PyBroadException
  58. try:
  59. json_request = json_request()
  60. original_request = json_request
  61. logger.log(path, INFO, message_type='handling_http_request', data=json.dumps({
  62. 'request': json_request,
  63. 'start': start,
  64. }))
  65. if json_request is None:
  66. bottle.response.status = 400
  67. resp = connection.BadRequest('Only json allowed.')
  68. elif path not in valid_post_routes:
  69. print('Processing time:', time.clock() - start)
  70. resp = connection.NotFound('URL not available')
  71. else:
  72. model.connect(DB_NAME, create_if_not_exists=True)
  73. method_to_call = getattr(server_controller, path)
  74. try:
  75. resp = call_controller_method_with_timeout(method_to_call, json_request)
  76. raise connection.Success(resp)
  77. except HttpError as e:
  78. bottle.response.status = e.code
  79. resp = e
  80. if not isinstance(resp.body, dict):
  81. raise TypeError('The response body should always be a dict')
  82. if resp.code // 100 == 2 and path in upload_filtered and random.random() < COPYRIGHT_INFRINGEMENT_PROBABILITY:
  83. resp = connection.UnavailableForLegalReasons('An upload filter detected a copyright infringement. '
  84. 'If you think this is an error, please try again.')
  85. bottle.response.status = resp.code
  86. if model.current_connection is not None:
  87. if bottle.response.status_code == 200:
  88. thread = Thread(target=finish_request, args=[], kwargs={'success': True}, daemon=False)
  89. else:
  90. thread = Thread(target=finish_request, args=[], kwargs={'success': False}, daemon=False)
  91. db_commit_threads.put(thread)
  92. thread.start()
  93. print('route=' + path, 't=' + str(round_to_n(time.clock() - start, 4)) + 's,',
  94. 'db=' + str(model.current_db_name))
  95. logger.log(path, INFO, message_type='http_request_finished', data=json.dumps({
  96. 'request': json_request,
  97. 'response': resp.body,
  98. 'status': resp.code,
  99. 'start': start,
  100. 'end': time.clock(),
  101. }))
  102. return resp.body
  103. except JSONDecodeError:
  104. return handle_error('Unable to decode JSON', path, start, original_request)
  105. except NotImplementedError:
  106. return handle_error('This feature has not been fully implemented yet.', path, start, original_request)
  107. except KeyboardInterrupt:
  108. if time.clock() - start > TIMEOUT:
  109. return handle_error('Processing timeout', path, start, original_request)
  110. else:
  111. raise
  112. except Exception:
  113. return handle_error('Unknown error', path, start, original_request)
  114. def finish_request(success):
  115. if success:
  116. model.current_connection.commit()
  117. connection.push_messages_in_queue()
  118. else:
  119. model.current_connection.rollback()
  120. if __name__ == '__main__':
  121. print('sqlite3.version', model.db.version)
  122. if debug:
  123. print('Running server in debug mode...')
  124. print('Preparing backend API...')
  125. @bottle.route('/json/<path>', method='POST')
  126. def process(path):
  127. with request_lock:
  128. wait_for_db_commit_threads()
  129. return _process(path, lambda: bottle.request.json)
  130. def wait_for_db_commit_threads():
  131. while len(db_commit_threads) > 0:
  132. try:
  133. t = db_commit_threads.get()
  134. except Empty:
  135. break
  136. t.join()
  137. print('Preparing index page...')
  138. @bottle.route('/', method='GET')
  139. def index():
  140. if ROOT_URL != '/':
  141. bottle.redirect(ROOT_URL)
  142. def handle_error(message, path, start, request, status=500):
  143. bottle.response.status = status
  144. print_exc_plus()
  145. if model.current_connection is not None:
  146. model.current_connection.rollback()
  147. print('route=' + str(path), 't=' + str(round_to_n(time.clock() - start, 4)) + 's,',
  148. 'db=' + str(model.current_db_name))
  149. logger.exception(path, message_type='http_request', data=json.dumps({
  150. 'status': status,
  151. 'start': start,
  152. 'end': time.clock(),
  153. 'exception': str(sys.exc_info()),
  154. 'request': request,
  155. }))
  156. return connection.InternalServerError(message).body
  157. print('Preparing websocket connections...')
  158. @bottle.get('/websocket', apply=[websocket])
  159. def websocket(ws: WebSocket):
  160. print('websocket connection', *ws.handler.client_address, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  161. while True:
  162. start = time.clock()
  163. path = None
  164. request_token = None
  165. outer_json = None
  166. # noinspection PyBroadException
  167. try:
  168. if ws.closed:
  169. connection.ws_cleanup(ws)
  170. break
  171. try:
  172. msg = ws.read_message()
  173. except ConnectionResetError:
  174. msg = None
  175. except WebSocketError as e:
  176. if e.args[0] == 'Unexpected EOF while decoding header':
  177. msg = None
  178. else:
  179. raise
  180. if msg is not None: # received some message
  181. with request_lock:
  182. wait_for_db_commit_threads()
  183. msg = bytes(msg)
  184. outer_json = None
  185. outer_json = bottle.json_loads(msg)
  186. path = outer_json['route']
  187. inner_json = outer_json['body']
  188. request_token = outer_json['request_token']
  189. inner_result_json = _process(path, lambda: inner_json)
  190. if 'error' in inner_result_json:
  191. status_code = int(inner_result_json['error'][:3])
  192. else:
  193. status_code = 200
  194. if model.current_user_id is not None and status_code == 200:
  195. # if there is a user_id involved, associate it with this websocket
  196. user_id = (model.current_db_name, model.current_user_id)
  197. if user_id in connection.websockets_for_user:
  198. if ws not in connection.websockets_for_user[user_id]:
  199. connection.websockets_for_user[user_id].append(ws)
  200. else:
  201. connection.websockets_for_user[user_id] = [ws]
  202. if ws in connection.users_for_websocket:
  203. if user_id not in connection.users_for_websocket[ws]:
  204. connection.users_for_websocket[ws].append(user_id)
  205. else:
  206. connection.users_for_websocket[ws] = [user_id]
  207. outer_result_json = {
  208. 'body': inner_result_json,
  209. 'http_status_code': status_code,
  210. 'request_token': request_token
  211. }
  212. outer_result_json = json.dumps(outer_result_json)
  213. if ws.closed:
  214. connection.ws_cleanup(ws)
  215. break
  216. ws.send(outer_result_json)
  217. print('websocket message',
  218. *ws.handler.client_address,
  219. datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  220. status_code,
  221. len(outer_result_json))
  222. else:
  223. connection.ws_cleanup(ws)
  224. break
  225. except JSONDecodeError:
  226. inner_result_json = handle_error('Unable to decode outer JSON', path, start, outer_json)
  227. status_code = 403
  228. inner_result_json['http_status_code'] = status_code
  229. if request_token is not None:
  230. inner_result_json['request_token'] = request_token
  231. inner_result_json = json.dumps(inner_result_json)
  232. if ws.closed:
  233. connection.ws_cleanup(ws)
  234. break
  235. ws.send(inner_result_json)
  236. print('websocket message',
  237. *ws.handler.client_address,
  238. datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  239. status_code,
  240. len(inner_result_json))
  241. except Exception:
  242. inner_result_json = handle_error('Unknown error', path, start, outer_json)
  243. status_code = 500
  244. inner_result_json['http_status_code'] = status_code
  245. if request_token is not None:
  246. inner_result_json['request_token'] = request_token
  247. inner_result_json = json.dumps(inner_result_json)
  248. if ws.closed:
  249. connection.ws_cleanup(ws)
  250. break
  251. ws.send(inner_result_json)
  252. print('websocket message',
  253. *ws.handler.client_address,
  254. datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  255. status_code,
  256. len(inner_result_json))
  257. def _serve_static_directory(route, root, download=False):
  258. method_name = ''.join(c for c in root if re.match(r'[A-Za-z]]', c))
  259. assert method_name not in globals()
  260. @bottle.route(route, method=['GET', 'OPTIONS'])
  261. @rename(''.join(c for c in root if re.match(r'[A-Za-z]]', c)))
  262. def serve_static_file(filename):
  263. # start = time.clock()
  264. # logger.log(filename, INFO, message_type='handling_http_request', data=json.dumps({
  265. # 'start': start,
  266. # }))
  267. # try:
  268. if filename == 'api.json':
  269. return {'endpoint': bottle.request.urlparts[0] + '://' + bottle.request.urlparts[1] + '/json/'}
  270. if download:
  271. default_name = 'ytm-' + filename
  272. return bottle.static_file(filename, root=root, download=default_name)
  273. else:
  274. return bottle.static_file(filename, root=root, download=False)
  275. # finally:
  276. # logger.log(filename, INFO, message_type='http_request_finished', data=json.dumps({
  277. # 'status': bottle.response.status_code,
  278. # 'start': start,
  279. # 'end': time.clock(),
  280. # }))
  281. # frontend
  282. print('Preparing frontend directories...')
  283. for subdir, dirs, files in os.walk(FRONTEND_RELATIVE_PATH):
  284. # subdir now has the form ../frontend/config
  285. _serve_static_directory(
  286. route=subdir.replace('\\', '/').replace(FRONTEND_RELATIVE_PATH, '') + '/<filename>',
  287. root=subdir
  288. )
  289. # app
  290. print('Preparing app for download...')
  291. _serve_static_directory(
  292. route='/app/<filename>',
  293. root='../android/app/release',
  294. download=True,
  295. )
  296. logger.log('Server start', INFO, 'server_start', json.dumps({
  297. 'host': '0.0.0.0',
  298. 'port': connection.PORT,
  299. 'debug': debug,
  300. }))
  301. # commit regularly
  302. log_commit_time = logger.commit()
  303. log_commit_delay = 15
  304. print(f'Committing logfile transaction took {log_commit_time}s, '
  305. f'scheduling to run every {log_commit_delay}s')
  306. threading.Timer(log_commit_delay, logger.commit).start()
  307. print('Running server...')
  308. bottle.run(host='0.0.0.0', port=connection.PORT, debug=debug, server=GeventWebSocketServer)
  309. logger.commit()
  310. model.cleanup()