h-4.2$ cat failover_test.py #!/usr/bin/env python3 import argparse import logging import pprint import sys import time import traceback from db_test_meter.database import Database from db_test_meter.test_run import TestRun from db_test_meter.util import init_logger, collect_user_input parser = argparse.ArgumentParser('This will gather metrics of a failover event') parser.add_argument('--test_run_id', metavar='<test run id>', type=str, nargs='?', required=True, help='a unique identifier for this test run') parser.add_argument('--loop_time', metavar='<seconds>', type=float, nargs='?', default='.5', help='sleep is used to insure this minimum loop time in sec. Can be decimal (defaults to .5') parser.add_argument('--debug', action='store_true') args = parser.parse_args() test_run_id = args.test_run_id loop_time = args.loop_time if loop_time <= 0: print('Loop time must be >= 0, exiting...') exit(1) init_logger(debug=args.debug) log = logging.getLogger() db_connection_metadata = collect_user_input() db = Database(db_connection_metadata) test_runner = TestRun(db) if not test_runner.test_db_connection(): log.fatal('Initial db connection failed. Check you connection setup and try again. Exiting...') exit(1) pre_failure_db_node_hostname = test_runner.get_db_node_hostname() print(f'Test starting, initial Db node hostname: {pre_failure_db_node_hostname}') post_failure_db_node_hostname = None try: while True: loop_start_time = time.time() test_runner.ensure_minumum_loop_time(loop_time, loop_start_time, test_runner.prev_loop_end_time) if test_runner.db_node_heartbeat(test_run_id): if test_runner.recovery_detected(): test_runner.failure_condition_end_time = time.time() post_failure_db_node_hostname = test_runner.get_db_node_hostname() test_runner.prev_loop_end_time = time.time() break test_runner.prev_loop_end_time = time.time() except Exception as e: print(f'There was an unexpected exception: {e}') print("-" * 60) traceback.print_exc(file=sys.stdout) print("-" * 60) exit(1) finally: test_runner.shutdown() pp = pprint.PrettyPrinter(indent=2) print('\n========================================') print(f'Total Db connection attempts: {test_runner.success_connect_count + test_runner.failed_connect_count}') print(f'Successful Db connections: {test_runner.success_connect_count}') print(f'Failed Db connections: {test_runner.failed_connect_count}') print(f'failure_start_time: {time.ctime(test_runner.failure_condition_start_time)}') print(f'failure_end_time: {time.ctime(test_runner.failure_condition_end_time)}') duration = int(test_runner.failure_condition_end_time - test_runner.failure_condition_start_time) print(f'failure condition duration: {duration} seconds') print(f'Last inserted sync record id on initial primary db node: {test_runner.last_inserted_heartbeat_index}') print(f'Pre-failure Db node hostname: {pre_failure_db_node_hostname}') print(f'Post-failure Db node hostname: {post_failure_db_node_hostname}') print(f'Newest 5 sync records in current primary db node:') pp.pprint(test_runner.get_last_sync_records(test_run_id, 5)) sh-4.2$ |
h-4.2$ cat create_failover_sync_db.py #!/usr/bin/env python3 import argparse import logging from db_test_meter.database import Database from db_test_meter.util import init_logger, collect_user_input, AppConfig def create_db(db: Database) -> None: """ Utility to create the db and table for the sync check :param db: :return: """ try: log.debug(f'creating database {AppConfig.TEST_DB_NAME}') db.run_query(f"DROP DATABASE IF EXISTS {AppConfig.TEST_DB_NAME}") db.run_query(f"CREATE DATABASE IF NOT EXISTS {AppConfig.TEST_DB_NAME}") log.debug(f'creating table {AppConfig.TEST_DB_TABLE}') db.run_query( f"CREATE TABLE {AppConfig.TEST_DB_NAME}.{AppConfig.TEST_DB_TABLE} (`test_run_id` varchar(50) NOT NULL, `index_id` int(10) unsigned NOT NULL, `created` int(8) NOT NULL)") print(f'Database {AppConfig.TEST_DB_NAME} created') print(f'Table {AppConfig.TEST_DB_NAME}.{AppConfig.TEST_DB_TABLE} created') except Exception as e: print(f'There was an error: {e}') parser = argparse.ArgumentParser( 'simple utility to create the db and table used by failover_test.py. Usage: ./create_failover_sync_db.py') parser.add_argument('--debug', action='store_true') init_logger(debug=parser.parse_args().debug) log = logging.getLogger() print('This will destroy and recreate sync database and tracking table') if (input("enter y to continue, n to exit [n]: ") or 'n').lower() == 'y': db_connection_metadata = collect_user_input() db = Database(db_connection_metadata) create_db(db) else: print('exiting...') sh-4.2$ |
h-4.2$ cat database.py import sys import pymysql import logging class Database: """Database connection class.""" def __init__(self, db_connection_metadata): self.host = db_connection_metadata['db_host'] self.port = int(db_connection_metadata['db_port']) self.username = db_connection_metadata['db_user'] self.password = db_connection_metadata['db_password'] self.charset = 'utf8mb4' self.cursorclass = pymysql.cursors.DictCursor self.read_timeout = db_connection_metadata['db_interact_timeout'] # 1 sec self.write_timeout = db_connection_metadata['db_interact_timeout'] # 1 sec self.connect_timeout = db_connection_metadata['db_interact_timeout'] # 1 sec self.ssl_metadata = db_connection_metadata['ssl_metadata'] self.conn = None def open_connection(self): if self.conn is None: logging.debug('opening db connection') self.conn = pymysql.connect( host=self.host, port=self.port, user=self.username, password=self.password, charset='utf8mb4', cursorclass=pymysql.cursors.DictCursor, read_timeout=self.read_timeout, # 1 sec write_timeout=self.write_timeout, # 1 sec connect_timeout=self.connect_timeout, # 1 sec ssl=self.ssl_metadata ) logging.debug('Connection opened successfully.') def run_query(self, query, query_params=None): try: cur = None self.open_connection() with self.conn.cursor() as cur: if 'SELECT' in query or 'SHOW' in query: records = [] logging.debug(f'executing query: {query} params:{query_params}') cur.execute(query, query_params) result = cur.fetchall() for row in result: records.append(row) logging.debug('closing db connection') cur.close() return records else: logging.debug(f'executing query: {query} params:{query_params}') cur.execute(query, query_params) self.conn.commit() affected = f"{cur.rowcount} rows affected." logging.debug('closing db connection') cur.close() return affected except pymysql.MySQLError as e: print(e) raise Exception('Db Connection failed') # finally: # if cur: # cur.close() except pymysql.OperationalError as e: print(e) raise Exception('Query failed to write') def close_connection(self): if self.conn: self.conn.close() self.conn = None logging.info('Database connection closed.')sh-4.2$ |
h-4.2$ cat test_run.py import time from db_test_meter.database import Database from db_test_meter.util import log, AppConfig class TestRun: def __init__(self, db: Database): self.db = db self.success_connect_count: int = 0 self.failed_connect_count: int = 0 self.current_phase: str = 'INIT' self.prev_loop_end_time: float = 0 self.failure_condition_start_time: float = 0 self.failure_condition_end_time: float = 0 self.heartbeat_index = 0 self.last_inserted_heartbeat_index = 0 def test_db_connection(self) -> bool: try: self.db.run_query('SELECT version()') print(f'Connection succeeded at {time.ctime()}') self.success_connect_count += 1 return True except Exception as e: print(f'There was an error: {e}') if self.current_phase == 'INIT': self.failure_condition_start_time = time.time() self.current_phase = 'FAILING' self.failed_connect_count += 1 if self.failed_connect_count <= 600: # limit error start to ~ 10 minutes return False else: log.fatal('Maximum Db connection failures of 600 occurred, exiting...') exit(1) def get_db_node_hostname(self): query = "SHOW variables LIKE 'hostname'" result = self.db.run_query(query) if result and 'Value' in result[0]: db_node_hostname = result[0]["Value"] log.debug(f'Db node Hostname: {db_node_hostname}') else: raise Exception(f'Unable to retrieve db node hostname with query: {query}') return db_node_hostname def db_node_heartbeat(self, test_run_id: str) -> bool: try: if self.current_phase == 'FAILING': return self.test_db_connection() else: self.heartbeat_index += 1 self.db.run_query( f"INSERT INTO {AppConfig.TEST_DB_NAME}.{AppConfig.TEST_DB_TABLE} SET test_run_id=%s, index_id=%s, created=UNIX_TIMESTAMP()", (test_run_id, self.heartbeat_index,)) print(f'Insert succeeded at {time.ctime()} test_run_id: {test_run_id}, index_id:{self.heartbeat_index}') self.last_inserted_heartbeat_index = self.heartbeat_index self.success_connect_count += 1 return True except Exception as e: print(f'There was an error: {e}') if self.current_phase == 'INIT': self.failure_condition_start_time = time.time() time.sleep(120) self.current_phase = 'FAILING' # we've failed so kill this connection self.db.close_connection() self.failed_connect_count += 1 if self.failed_connect_count <= 600: # limit error start to ~ 10 minutes return False else: log.fatal('Maximum Db connection failures of 600 occurred, exiting...') exit(1) def recovery_detected(self) -> bool: if self.current_phase == 'FAILING': # we've recovered log.debug('moving from phase FAILING -> RECOVERED') self.current_phase = 'RECOVERED' return True return False def ensure_minumum_loop_time(self, loop_time_min_in_sec: float, loop_start_time: float, prev_loop_end_time: float): if prev_loop_end_time != 0: log.debug(f'this loop start time: {loop_start_time}') log.debug(f'prev loop start end time: {prev_loop_end_time}') last_loop_runtime = loop_start_time - prev_loop_end_time log.debug(f'last loop runtime: {last_loop_runtime}') if last_loop_runtime < loop_time_min_in_sec: sleep_time = loop_time_min_in_sec - last_loop_runtime log.debug(f'sleeping {sleep_time}') time.sleep(sleep_time) def get_last_sync_records(self, test_run_id: str, number_of_records: int) -> dict: result = self.db.run_query( f'SELECT * FROM {AppConfig.TEST_DB_NAME}.{AppConfig.TEST_DB_TABLE} WHERE test_run_id = %s ORDER BY `index_id` DESC LIMIT %s', (test_run_id, number_of_records)) return result def shutdown(self): self.db.close_connection() sh-4.2$ |
sh-4.2$ cat util.py import getpass import logging import os import sys import json import boto3 client = boto3.client('secretsmanager') log = logging.getLogger() class AppConfig: TEST_DB_NAME = 'db_test_meter' TEST_DB_TABLE = 'db_sync' def init_logger(debug=False) -> None: log_level = logging.DEBUG if debug else logging.WARNING logging.getLogger().setLevel(log_level) handler = logging.StreamHandler(sys.stdout) handler.setLevel(log_level) log.addHandler(handler) def collect_user_input() -> dict: user_input = {'ssl_metadata': None} user_input['db_interact_timeout'] = 1 # 1 sec responce = client.list_secrets(MaxResults=1) user_input['secret_arn'] = responce['SecretList'][0]['ARN'] user_input['secret_versionId'] = responce['SecretList'][0]['SecretVersionsToStages'] responce = client.get_secret_value(SecretId=user_input['secret_arn']) secretString = json.loads(responce['SecretString']) user_input['db_user'] = secretString['username'] user_input['db_password'] = secretString['password'] user_input['db_host'] = secretString['host'] user_input['db_port'] = secretString['port'] using_ssl = input('Connecting over SSL (y/n) [y]: ').strip().lower() or 'y' if using_ssl == 'y': path_to_ssl_cert = input('path to ssl cert [./rds-combined-ca-bundle.pem]: ') or './rds-combined-ca-bundle.pem' if not os.path.exists(os.path.abspath(path_to_ssl_cert)): log.fatal(f'SSL cert not found at: {path_to_ssl_cert}') exit(1) user_input['ssl_metadata'] = {'ssl': {'ca': path_to_ssl_cert}} print(user_input['db_host']) return user_inputsh-4.2$ |
from __future__ import print_function # Python 2/3 compatibility import boto3 import json import decimal import time import os count = 0 session = boto3.session.Session() region = session.region_name dynamodb = boto3.resource('dynamodb', region_name=region) table_name = 'Cast' # table name pk = 'year' # primary key sk = 'title' # sort key file_name = 'cast_full.json' def create_table(): try: table = dynamodb.create_table( TableName=table_name, KeySchema=[ { 'AttributeName': pk, 'KeyType': 'HASH' #Partition key }, { 'AttributeName': sk, 'KeyType': 'RANGE' #Sort key } ], AttributeDefinitions=[ { 'AttributeName': 'year', 'AttributeType': 'N' }, { 'AttributeName': 'title', 'AttributeType': 'S' }, ], BillingMode='PAY_PER_REQUEST' #ProvisionedThroughput={ # 'ReadCapacityUnits': 125, # 'WriteCapacityUnits': 125 # } ) print("Table status:", table.table_status) except: print("Table exist:Uploading data") table = dynamodb.Table('Cast') def add_table(): table = dynamodb.Table(table_name) count = 0 with open(file_name) as json_file: movies = json.load(json_file, parse_float = decimal.Decimal) with table.batch_writer(overwrite_by_pkeys=[pk, sk]) as batch: for movie in movies: titleId = movie['titleId'] title = movie['title'] year = int(movie['year']) genres = movie['genres'] runtimeMinutes = int(movie['runtimeMinutes']) cast = movie['cast'] count = count + 1 print("Adding record count:", count) batch.put_item( Item={ 'titleId': titleId, 'year': year, 'title': title, 'genres': genres, 'runtimeMinutes': runtimeMinutes, 'cast': cast, } ) def main(): create_table() add_table() if __name__ == "__main__": main() |
from __future__ import print_function # Python 2/3 compatibility import boto3 import json import decimal from boto3.dynamodb.conditions import Key, Attr table_name = 'Cast' pk = 'year' sk = 'title' session = boto3.session.Session() region = session.region_name class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, decimal.Decimal): if o % 1 > 0: return float(o) else: return int(o) return super(DecimalEncoder, self).default(o) dynamodb = boto3.resource('dynamodb', region_name=region) table = dynamodb.Table(table_name) fe = Key(pk).between(1990, 1991) & Key(sk).between('A', 'D') pe = "#yr, title, #ca" ean = { "#yr": "year", "#ca": "cast",} esk = None response = table.scan( FilterExpression=fe, ProjectionExpression=pe, ExpressionAttributeNames=ean ) for i in response['Items']: print(json.dumps(i, cls=DecimalEncoder)) while 'LastEvaluatedKey' in response: response = table.scan( ProjectionExpression=pe, FilterExpression=fe, ExpressionAttributeNames= ean, ExclusiveStartKey=response['LastEvaluatedKey'] ) #parsing and printing the JSON response for i in response['Items']: print(i['year'], ":", i['title'] + " and the actors are:") for j in i['cast']: print(j['name']) print('\n') |
from __future__ import print_function # Python 2/3 compatibility import boto3 import json import decimal from boto3.dynamodb.conditions import Key, Attr session = boto3.session.Session() region = session.region_name table_name = 'Cast' pk = 'year' sk = 'title' class DecimalEncoder(json.JSONEncoder): def default(self, o): if isinstance(o, decimal.Decimal): return str(o) return super(DecimalEncoder, self).default(o) dynamodb = boto3.resource('dynamodb', region_name=region) table = dynamodb.Table(table_name) print("Movies in 2005 - titles A-L, and list of actors") response = table.query( ProjectionExpression="#yr, title, #ca", ExpressionAttributeNames={ "#yr": "year", "#ca": "cast" }, KeyConditionExpression=Key(pk).eq(2005) & Key(sk).between('A', 'L') ) for i in response['Items']: print(i['year'], ":", i['title'] + " and the actors are:") for j in i['cast']: print(j['name']) print('\n') |
from __future__ import print_function import os import amazondax import botocore.session import boto3 my_session = boto3.session.Session() region = my_session.region_name session = botocore.session.get_session() dynamodb = session.create_client('dynamodb', region_name=region) # low-level client table_name = "TryDaxTable" params = { 'TableName' : table_name, 'KeySchema': [ { 'AttributeName': "pk", 'KeyType': "HASH"}, # Partition key { 'AttributeName': "sk", 'KeyType': "RANGE" } # Sort key ], 'AttributeDefinitions': [ { 'AttributeName': "pk", 'AttributeType': "N" }, { 'AttributeName': "sk", 'AttributeType': "N" } ], 'ProvisionedThroughput': { 'ReadCapacityUnits': 10, 'WriteCapacityUnits': 10 } } # Create the table dynamodb.create_table(**params) # Wait for the table to exist before exiting print('Waiting for', table_name, '...') waiter = dynamodb.get_waiter('table_exists') waiter.wait(TableName=table_name) [ssm-user@ip-10-0-1-89 python]$ |
from __future__ import print_function import os, sys, time import amazondax import botocore.session import boto3 my_session = boto3.session.Session() region = my_session.region_name session = botocore.session.get_session() dynamodb = session.create_client('dynamodb', region_name=region) # low-level client table_name = "TryDaxTable" if len(sys.argv) > 1: endpoint = sys.argv[1] dax = amazondax.AmazonDaxClient(session, region_name=region, endpoints=[endpoint]) client = dax else: client = dynamodb pk = 10 sk = 10 iterations = 50 start = time.time() for i in range(iterations): for ipk in range(1, pk+1): for isk in range(1, sk+1): params = { 'TableName': table_name, 'Key': { "pk": {'N': str(ipk)}, "sk": {'N': str(isk)} } } result = client.get_item(**params) print('.', end='', file=sys.stdout); sys.stdout.flush() print() end = time.time() print('Total time: {} sec - Avg time: {} sec'.format(end - start, (end-start)/iterations)) |
'AWS > AWS Database' 카테고리의 다른 글
AWS DocumentDB (0) | 2022.04.13 |
---|---|
AWS Aurora 데이터베이스 (0) | 2022.04.13 |
AWS RDS TSL/SSL connection (0) | 2022.04.13 |
AWS DB 실습 동영상 (0) | 2022.04.13 |
Amazon RDS 데이터베이스 (0) | 2022.04.12 |