123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- import json
- import random
- import re
- import sys
- from datetime import datetime
- from time import perf_counter
- from typing import Dict, Callable
- from uuid import uuid4
- import requests
- import connection
- import test.do_some_requests.current_websocket
- from game import random_ownable_name
- from test import failed_requests
- from util import round_to_n
- DEFAULT_PW = 'pw'
- PORT = connection.PORT
- HOST = 'http://127.0.0.1' + ':' + str(PORT)
- # HOST = 'http://koljastrohm-games.com' + ':' + str(PORT)
- JSON_HEADERS = {'Content-type': 'application/json'}
- EXIT_ON_FAILED_REQUEST = True
- response_collection: Dict[str, Dict] = {}
- default_request_method: Callable[[str, Dict], Dict]
- def receive_answer(token):
- """Waits until the server sends an answer that contains the desired request_token.
- All intermediate requests are also collected for later use, or, if they contain no token, they are just printed out.
- """
- if token in response_collection:
- json_content = response_collection[token]
- del response_collection[token]
- return json_content
- json_content = {}
- while 'request_token' not in json_content or json_content['request_token'] != token:
- if 'request_token' in json_content:
- response_collection[json_content['request_token']] = json_content
- received = test.do_some_requests.current_websocket.current_websocket.recv_data_frame()[1].data
- content = received.decode('utf-8')
- formatted_content = re.sub(r'{([^}]*?):(.*?)}', r'\n{\g<1>:\g<2>}', content)
- print('Received through websocket: ' + formatted_content)
- json_content = json.loads(content)
- return json_content
- def websocket_request(route: str, data: Dict) -> Dict:
- original_data = data
- if not test.do_some_requests.current_websocket.current_websocket.connected:
- ws_host = HOST.replace('http://', 'ws://')
- test.do_some_requests.current_websocket.current_websocket.connect(ws_host + '/websocket')
- token = str(uuid4())
- data = json.dumps({'route': route, 'body': data, 'request_token': token})
- print('Sending to websocket:', str(data).replace('{', '\n{')[1:])
- test.do_some_requests.current_websocket.current_websocket.send(data, opcode=2)
- json_content = receive_answer(token)
- print()
- status_code = json_content['http_status_code']
- if status_code == 200:
- pass
- elif status_code == 451: # Copyright problems, likely a bug in the upload filter
- # Try again
- return websocket_request(route, original_data)
- else:
- if EXIT_ON_FAILED_REQUEST:
- if not test.do_some_requests.current_websocket.current_websocket.connected:
- test.do_some_requests.current_websocket.current_websocket.close()
- sys.exit(status_code)
- failed_requests.append((route, status_code))
- return json_content['body']
- def http_request(route: str, data: Dict) -> Dict:
- original_data = data
- data = json.dumps(data)
- print('Sending to /' + route + ':', str(data).replace('{', '\n{')[1:])
- r = requests.post(HOST + '/json/' + route, data=data,
- headers=JSON_HEADERS)
- content = r.content.decode()
- print('Request returned: ' + content.replace('{', '\n{'))
- print()
- if r.status_code == 200:
- pass
- elif r.status_code == 451:
- return http_request(route, original_data)
- else:
- if EXIT_ON_FAILED_REQUEST:
- sys.exit(r.status_code)
- failed_requests.append((route, r.status_code))
- return json.loads(content)
- default_request_method: Callable[[str, Dict], Dict] = http_request
- # default_request_method = websocket_request
- def random_time():
- start = random.randrange(140)
- start = 1561960800 + start * 21600 # somewhere in july or at the beginning of august 2019
- return {
- 'dt_start': start,
- 'dt_end': start + random.choice([1800, 3600, 7200, 86400]),
- }
- def run_tests():
- print('You are currently in debug mode.')
- print('Host:', str(HOST))
- usernames = [f'user{datetime.now().timestamp()}',
- f'user{datetime.now().timestamp()}+1']
- banks = usernames[:1]
- session_ids = {}
- message = {}
- route = 'news'
- default_request_method(route, message)
- message = {}
- route = 'leaderboard'
- default_request_method(route, message)
- message = {}
- route = 'tradables'
- default_request_method(route, message)
- for username in usernames:
- message = {'username': username, 'password': DEFAULT_PW}
- route = 'register'
- default_request_method(route, message)
- message = {'username': username, 'password': DEFAULT_PW}
- route = 'login'
- session_ids[username] = default_request_method(route, message)['session_id']
- message = {'session_id': session_ids[username]}
- route = 'logout'
- default_request_method(route, message)
- message = {'username': username, 'password': DEFAULT_PW}
- route = 'login'
- session_ids[username] = default_request_method(route, message)['session_id']
- message = {'session_id': session_ids[username]}
- route = 'depot'
- default_request_method(route, message)
- message = {'session_id': session_ids[username]}
- route = 'orders'
- default_request_method(route, message)
- message = {'session_id': session_ids[username], "ownable": "\u20adollar"}
- route = 'orders_on'
- default_request_method(route, message)
- for password in ['pw2', DEFAULT_PW]:
- message = {'session_id': session_ids[username], 'password': password}
- route = 'change_password'
- default_request_method(route, message)
- message = {'username': username, 'password': password}
- route = 'login'
- session_ids[username] = default_request_method(route, message)['session_id']
- for limit in [0, 5, 10, 20, 50]:
- message = {'session_id': session_ids[username], 'limit': limit}
- route = 'trades'
- data = default_request_method(route, message)['data']
- assert len(data) <= limit
- for username in banks:
- message = {'session_id': session_ids[username], 'amount': 5.5e6}
- route = 'take_out_personal_loan'
- default_request_method(route, message)
- message = {'session_id': session_ids[username]}
- route = 'buy_banking_license'
- default_request_method(route, message)
- message = {'session_id': session_ids[username],
- 'coupon': 1.05,
- 'name': random_ownable_name(),
- 'run_time': 43200}
- route = 'issue_bond'
- default_request_method(route, message)
- message = {'issuer': username}
- route = 'bonds'
- default_request_method(route, message)
- message = {'issuer': username, 'only_next_mro_qualified': True}
- route = 'bonds'
- default_request_method(route, message)
- message = {'issuer': username, 'only_next_mro_qualified': False}
- route = 'bonds'
- default_request_method(route, message)
- message = {}
- route = 'bonds'
- default_request_method(route, message)
- message = {'only_next_mro_qualified': True}
- route = 'bonds'
- default_request_method(route, message)
- message = {'only_next_mro_qualified': False}
- route = 'bonds'
- default_request_method(route, message)
- for session_id in session_ids.values():
- message = {'session_id': session_id}
- route = 'logout'
- default_request_method(route, message)
- def main():
- global default_request_method
- for m in [
- test.do_some_requests.websocket_request,
- # test.do_some_requests.http_request
- ]:
- # print('Removing existing database (if exists)...', end='')
- # try:
- # os.remove(DB_NAME + '.db')
- # except PermissionError:
- # print('Could not recreate database')
- # sys.exit(-1)
- # except FileNotFoundError:
- # pass
- # print('done')
- default_request_method = m
- start = perf_counter()
- run_tests()
- print()
- print('Failed requests:', failed_requests)
- print('Total time:' + str(round_to_n(perf_counter() - start, 4)) + 's,')
- if test.do_some_requests.current_websocket.current_websocket.connected:
- test.do_some_requests.current_websocket.current_websocket.close()
- sys.exit(len(failed_requests))
- if __name__ == '__main__':
- main()
|