import json import random import re import sys from datetime import datetime from time import perf_counter from typing import Dict, Callable from uuid import uuid4 import requests import connection import test.do_some_requests.current_websocket from test import failed_requests from util import round_to_n DEFAULT_PW = 'pw' PORT = connection.PORT HOST = 'http://127.0.0.1' + ':' + str(PORT) # HOST = 'http://koljastrohm-games.com' + ':' + str(PORT) JSON_HEADERS = {'Content-type': 'application/json'} EXIT_ON_FAILED_REQUEST = True response_collection: Dict[str, Dict] = {} default_request_method: Callable[[str, Dict], Dict] def receive_answer(token): """Waits until the server sends an answer that contains the desired request_token. All intermediate requests are also collected for later use, or, if they contain no token, they are just printed out. """ if token in response_collection: json_content = response_collection[token] del response_collection[token] return json_content json_content = {} while 'request_token' not in json_content or json_content['request_token'] != token: if 'request_token' in json_content: response_collection[json_content['request_token']] = json_content received = test.do_some_requests.current_websocket.current_websocket.recv_data_frame()[1].data content = received.decode('utf-8') formatted_content = re.sub(r'{([^}]*?):(.*?)}', r'\n{\g<1>:\g<2>}', content) print('Received through websocket: ' + formatted_content) json_content = json.loads(content) return json_content def websocket_request(route: str, data: Dict) -> Dict: original_data = data if not test.do_some_requests.current_websocket.current_websocket.connected: ws_host = HOST.replace('http://', 'ws://') test.do_some_requests.current_websocket.current_websocket.connect(ws_host + '/websocket') token = str(uuid4()) data = json.dumps({'route': route, 'body': data, 'request_token': token}) print('Sending to websocket:', str(data).replace('{', '\n{')[1:]) test.do_some_requests.current_websocket.current_websocket.send(data, opcode=2) json_content = receive_answer(token) print() status_code = json_content['http_status_code'] if status_code == 200: pass elif status_code == 451: # Copyright problems, likely a bug in the upload filter # Try again return websocket_request(route, original_data) else: if EXIT_ON_FAILED_REQUEST: if not test.do_some_requests.current_websocket.current_websocket.connected: test.do_some_requests.current_websocket.current_websocket.close() sys.exit(status_code) failed_requests.append((route, status_code)) return json_content['body'] def http_request(route: str, data: Dict) -> Dict: original_data = data data = json.dumps(data) print('Sending to /' + route + ':', str(data).replace('{', '\n{')[1:]) r = requests.post(HOST + '/json/' + route, data=data, headers=JSON_HEADERS) content = r.content.decode() print('Request returned: ' + content.replace('{', '\n{')) print() if r.status_code == 200: pass elif r.status_code == 451: return http_request(route, original_data) else: if EXIT_ON_FAILED_REQUEST: sys.exit(r.status_code) failed_requests.append((route, r.status_code)) return json.loads(content) default_request_method: Callable[[str, Dict], Dict] = http_request # default_request_method = websocket_request def random_time(): start = random.randrange(140) start = 1561960800 + start * 21600 # somewhere in july or at the beginning of august 2019 return { 'dt_start': start, 'dt_end': start + random.choice([1800, 3600, 7200, 86400]), } def run_tests(): print('You are currently in debug mode.') print('Host:', str(HOST)) usernames = [f'user{datetime.now().timestamp()}', f'user{datetime.now().timestamp()}+1'] session_ids = {} message = {} route = 'news' default_request_method(route, message) message = {} route = 'leaderboard' default_request_method(route, message) message = {} route = 'tradables' default_request_method(route, message) for username in usernames: message = {'username': username, 'password': DEFAULT_PW} route = 'register' default_request_method(route, message) message = {'username': username, 'password': DEFAULT_PW} route = 'login' session_ids[username] = default_request_method(route, message)['session_id'] message = {'session_id': session_ids[username]} route = 'logout' default_request_method(route, message) message = {'username': username, 'password': DEFAULT_PW} route = 'login' session_ids[username] = default_request_method(route, message)['session_id'] message = {'session_id': session_ids[username]} route = 'depot' default_request_method(route, message) message = {'session_id': session_ids[username]} route = 'orders' default_request_method(route, message) message = {'session_id': session_ids[username], "ownable": "\u20adollar"} route = 'orders_on' default_request_method(route, message) for password in ['pw2', DEFAULT_PW]: message = {'session_id': session_ids[username], 'password': password} route = 'change_password' default_request_method(route, message) message = {'session_id': session_ids[username]} route = 'logout' default_request_method(route, message) message = {'username': username, 'password': DEFAULT_PW} route = 'login' session_ids[username] = default_request_method(route, message)['session_id'] for limit in [0, 5, 10, 20, 50]: message = {'session_id': session_ids[username], 'limit': limit} route = 'trades' data = default_request_method(route, message)['data'] assert len(data) <= limit for session_id in session_ids: message = {'session_id': session_id} route = 'logout' default_request_method(route, message) def main(): global default_request_method for m in [ test.do_some_requests.websocket_request, # test.do_some_requests.http_request ]: # print('Removing existing database (if exists)...', end='') # try: # os.remove(DB_NAME + '.db') # except PermissionError: # print('Could not recreate database') # sys.exit(-1) # except FileNotFoundError: # pass # print('done') default_request_method = m start = perf_counter() run_tests() print() print('Failed requests:', failed_requests) print('Total time:' + str(round_to_n(perf_counter() - start, 4)) + 's,') if test.do_some_requests.current_websocket.current_websocket.connected: test.do_some_requests.current_websocket.current_websocket.close() sys.exit(len(failed_requests)) if __name__ == '__main__': main()