run_server.py 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366
  1. import datetime
  2. import errno
  3. import json
  4. import os
  5. import random
  6. import re
  7. import sys
  8. import time
  9. from json import JSONDecodeError
  10. from logging import INFO
  11. from threading import Thread
  12. from typing import Dict, Any
  13. import bottle
  14. # noinspection PyUnresolvedReferences
  15. from bottle.ext.websocket import GeventWebSocketServer
  16. # noinspection PyUnresolvedReferences
  17. from bottle.ext.websocket import websocket
  18. from gevent import threading
  19. from gevent.queue import Queue, Empty
  20. from gevent.threading import Lock
  21. from geventwebsocket import WebSocketError
  22. from geventwebsocket.websocket import WebSocket
  23. import connection
  24. import model
  25. import server_controller
  26. from connection import HttpError
  27. from debug import debug
  28. from game import ROOT_URL, COPYRIGHT_INFRINGEMENT_PROBABILITY, DB_NAME, logger
  29. from lib.print_exc_plus import print_exc_plus
  30. from lib.threading_timer_decorator import exit_after
  31. from routes import valid_post_routes, upload_filtered
  32. from util import round_to_n, rename, profile_wall_time_instead_if_profiling, LogicError
  33. FRONTEND_RELATIVE_PATH = './frontend'
  34. profile_wall_time_instead_if_profiling()
  35. request_lock = Lock() # locked until the response to the request is computed
  36. db_commit_threads = Queue()
  37. if debug:
  38. TIMEOUT = 600
  39. else:
  40. TIMEOUT = 10
  41. assert all(getattr(server_controller, route) for route in valid_post_routes)
  42. def reset_global_variables():
  43. model.current_connection = None
  44. model.current_cursor = None
  45. model.current_db_name = None
  46. model.current_user_id = None
  47. del connection.push_message_queue[:]
  48. bottle.response.status = 500
  49. @exit_after(TIMEOUT)
  50. def call_controller_method_with_timeout(method, json_request: Dict[str, Any]):
  51. return method(json_request)
  52. def _process(path, json_request):
  53. start = time.clock()
  54. path = path.strip().lower()
  55. bottle.response.content_type = 'application/json; charset=latin-1'
  56. reset_global_variables()
  57. original_request = None
  58. # noinspection PyBroadException
  59. try:
  60. json_request = json_request()
  61. original_request = json_request
  62. logger.log(path, INFO, message_type='handling_http_request', data=json.dumps({
  63. 'request': json_request,
  64. 'start': start,
  65. }))
  66. if json_request is None:
  67. bottle.response.status = 400
  68. resp = connection.BadRequest('Only json allowed.')
  69. elif path not in valid_post_routes and not debug:
  70. print('Processing time:', time.clock() - start)
  71. resp = connection.NotFound('URL not available')
  72. else:
  73. model.connect(DB_NAME, create_if_not_exists=True)
  74. if path not in valid_post_routes:
  75. if not debug:
  76. raise LogicError
  77. method_to_call = server_controller._mocked_route
  78. else:
  79. method_to_call = getattr(server_controller, path)
  80. try:
  81. resp = call_controller_method_with_timeout(method_to_call, json_request)
  82. if isinstance(resp, HttpError):
  83. raise resp
  84. raise connection.Success(resp)
  85. except HttpError as e:
  86. bottle.response.status = e.code
  87. resp = e
  88. if not isinstance(resp.body, dict):
  89. raise TypeError('The response body should always be a dict')
  90. if resp.code // 100 == 2 and path in upload_filtered and random.random() < COPYRIGHT_INFRINGEMENT_PROBABILITY:
  91. resp = connection.UnavailableForLegalReasons('An upload filter detected a copyright infringement. '
  92. 'If you think this is an error, please try again.')
  93. bottle.response.status = resp.code
  94. if model.current_connection is not None:
  95. if bottle.response.status_code == 200:
  96. thread = Thread(target=finish_request, args=[], kwargs={'success': True}, daemon=False)
  97. else:
  98. thread = Thread(target=finish_request, args=[], kwargs={'success': False}, daemon=False)
  99. db_commit_threads.put(thread)
  100. thread.start()
  101. print('route=' + path, 't=' + str(round_to_n(time.clock() - start, 4)) + 's,',
  102. 'db=' + str(model.current_db_name))
  103. logger.log(path, INFO, message_type='http_request_finished', data=json.dumps({
  104. 'request': json_request,
  105. 'response': resp.body,
  106. 'status': resp.code,
  107. 'start': start,
  108. 'end': time.clock(),
  109. }))
  110. return resp.body
  111. except JSONDecodeError:
  112. return handle_error('Unable to decode JSON', path, start, original_request)
  113. except NotImplementedError:
  114. return handle_error('This feature has not been fully implemented yet.', path, start, original_request)
  115. except KeyboardInterrupt:
  116. if time.clock() - start > TIMEOUT:
  117. return handle_error('Processing timeout', path, start, original_request)
  118. else:
  119. raise
  120. except Exception:
  121. return handle_error('Unknown error', path, start, original_request)
  122. def finish_request(success):
  123. if success:
  124. model.current_connection.commit()
  125. connection.push_messages_in_queue()
  126. else:
  127. model.current_connection.rollback()
  128. if __name__ == '__main__':
  129. print('sqlite3.version', model.db.version)
  130. if debug:
  131. print('Running server in debug mode...')
  132. print('Preparing backend API...')
  133. @bottle.route('/json/<path>', method='POST')
  134. def process(path):
  135. with request_lock:
  136. wait_for_db_commit_threads()
  137. return _process(path, lambda: bottle.request.json)
  138. def wait_for_db_commit_threads():
  139. while len(db_commit_threads) > 0:
  140. try:
  141. t = db_commit_threads.get()
  142. except Empty:
  143. break
  144. t.join()
  145. print('Preparing index page...')
  146. @bottle.route('/', method='GET')
  147. def index():
  148. if ROOT_URL != '/':
  149. bottle.redirect(ROOT_URL)
  150. def handle_error(message, path, start, request, status=500):
  151. bottle.response.status = status
  152. print_exc_plus()
  153. if model.current_connection is not None:
  154. model.current_connection.rollback()
  155. print('route=' + str(path), 't=' + str(round_to_n(time.clock() - start, 4)) + 's,',
  156. 'db=' + str(model.current_db_name))
  157. logger.exception(path, message_type='http_request', data=json.dumps({
  158. 'status': status,
  159. 'start': start,
  160. 'end': time.clock(),
  161. 'exception': str(sys.exc_info()),
  162. 'request': request,
  163. }))
  164. return connection.InternalServerError(message).body
  165. print('Preparing websocket connections...')
  166. @bottle.get('/websocket', apply=[websocket])
  167. def websocket(ws: WebSocket):
  168. print('websocket connection', *ws.handler.client_address, datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
  169. while True:
  170. start = time.clock()
  171. path = None
  172. request_token = None
  173. outer_json = None
  174. # noinspection PyBroadException
  175. try:
  176. if ws.closed:
  177. connection.ws_cleanup(ws)
  178. break
  179. try:
  180. msg = ws.read_message()
  181. except ConnectionResetError:
  182. msg = None
  183. except WebSocketError as e:
  184. if e.args[0] == 'Unexpected EOF while decoding header':
  185. msg = None
  186. else:
  187. raise
  188. if msg is not None: # received some message
  189. with request_lock:
  190. wait_for_db_commit_threads()
  191. msg = bytes(msg)
  192. outer_json = None
  193. outer_json = bottle.json_loads(msg)
  194. path = outer_json['route']
  195. inner_json = outer_json['body']
  196. request_token = outer_json['request_token']
  197. inner_result_json = _process(path, lambda: inner_json)
  198. if 'error' in inner_result_json:
  199. status_code = int(inner_result_json['error'][:3])
  200. else:
  201. status_code = 200
  202. if model.current_user_id is not None and status_code == 200:
  203. # if there is a user_id involved, associate it with this websocket
  204. user_id = (model.current_db_name, model.current_user_id)
  205. if user_id in connection.websockets_for_user:
  206. if ws not in connection.websockets_for_user[user_id]:
  207. connection.websockets_for_user[user_id].append(ws)
  208. else:
  209. connection.websockets_for_user[user_id] = [ws]
  210. if ws in connection.users_for_websocket:
  211. if user_id not in connection.users_for_websocket[ws]:
  212. connection.users_for_websocket[ws].append(user_id)
  213. else:
  214. connection.users_for_websocket[ws] = [user_id]
  215. outer_result_json = {
  216. 'body': inner_result_json,
  217. 'http_status_code': status_code,
  218. 'request_token': request_token
  219. }
  220. outer_result_json = json.dumps(outer_result_json)
  221. if ws.closed:
  222. connection.ws_cleanup(ws)
  223. break
  224. ws.send(outer_result_json)
  225. print('websocket message',
  226. *ws.handler.client_address,
  227. datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  228. status_code,
  229. len(outer_result_json))
  230. else:
  231. connection.ws_cleanup(ws)
  232. break
  233. except JSONDecodeError:
  234. inner_result_json = handle_error('Unable to decode outer JSON', path, start, outer_json)
  235. status_code = 403
  236. inner_result_json['http_status_code'] = status_code
  237. if request_token is not None:
  238. inner_result_json['request_token'] = request_token
  239. inner_result_json = json.dumps(inner_result_json)
  240. if ws.closed:
  241. connection.ws_cleanup(ws)
  242. break
  243. ws.send(inner_result_json)
  244. print('websocket message',
  245. *ws.handler.client_address,
  246. datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  247. status_code,
  248. len(inner_result_json))
  249. except Exception:
  250. inner_result_json = handle_error('Unknown error', path, start, outer_json)
  251. status_code = 500
  252. inner_result_json['http_status_code'] = status_code
  253. if request_token is not None:
  254. inner_result_json['request_token'] = request_token
  255. inner_result_json = json.dumps(inner_result_json)
  256. if ws.closed:
  257. connection.ws_cleanup(ws)
  258. break
  259. ws.send(inner_result_json)
  260. print('websocket message',
  261. *ws.handler.client_address,
  262. datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
  263. status_code,
  264. len(inner_result_json))
  265. def _serve_static_directory(route, root, download=False):
  266. method_name = ''.join(c for c in root if re.match(r'[A-Za-z]]', c))
  267. assert method_name not in globals()
  268. @bottle.route(route, method=['GET', 'OPTIONS'])
  269. @rename(''.join(c for c in root if re.match(r'[A-Za-z]]', c)))
  270. def serve_static_file(filename):
  271. # start = time.clock()
  272. # logger.log(filename, INFO, message_type='handling_http_request', data=json.dumps({
  273. # 'start': start,
  274. # }))
  275. # try:
  276. if filename == 'api.json':
  277. return {'endpoint': bottle.request.urlparts[0] + '://' + bottle.request.urlparts[1] + '/json/'}
  278. if download:
  279. default_name = 'ytm-' + filename
  280. return bottle.static_file(filename, root=root, download=default_name)
  281. else:
  282. return bottle.static_file(filename, root=root, download=False)
  283. # finally:
  284. # logger.log(filename, INFO, message_type='http_request_finished', data=json.dumps({
  285. # 'status': bottle.response.status_code,
  286. # 'start': start,
  287. # 'end': time.clock(),
  288. # }))
  289. # frontend
  290. print('Preparing frontend directories...')
  291. if len(os.listdir(FRONTEND_RELATIVE_PATH)) == 0:
  292. raise FileNotFoundError(errno.ENOENT, 'Frontend directory is empty:', FRONTEND_RELATIVE_PATH)
  293. for subdir, dirs, files in os.walk(FRONTEND_RELATIVE_PATH):
  294. # subdir now has the form ../frontend/config
  295. _serve_static_directory(
  296. route=subdir.replace('\\', '/').replace(FRONTEND_RELATIVE_PATH, '') + '/<filename>',
  297. root=subdir
  298. )
  299. # app
  300. print('Preparing app for download...')
  301. _serve_static_directory(
  302. route='/app/<filename>',
  303. root='../android/app/release',
  304. download=True,
  305. )
  306. logger.log('Server start', INFO, 'server_start', json.dumps({
  307. 'host': '0.0.0.0',
  308. 'port': connection.PORT,
  309. 'debug': debug,
  310. }))
  311. # commit regularly
  312. log_commit_time = logger.commit()
  313. log_commit_delay = 15
  314. print(f'Committing logfile transaction took {log_commit_time}s, '
  315. f'scheduling to run every {log_commit_delay}s')
  316. threading.Timer(log_commit_delay, logger.commit).start()
  317. print('Running server...')
  318. bottle.run(host='0.0.0.0', port=connection.PORT, debug=debug, server=GeventWebSocketServer)
  319. logger.commit()
  320. model.cleanup()