h-4.2$ cat failover_test.py
#!/usr/bin/env python3

import argparse
import logging
import pprint
import sys
import time
import traceback

from db_test_meter.database import Database
from db_test_meter.test_run import TestRun
from db_test_meter.util import init_logger, collect_user_input

parser = argparse.ArgumentParser('This will gather metrics of a failover event')
parser.add_argument('--test_run_id', metavar='<test run id>', type=str, nargs='?', required=True,
                    help='a unique identifier for this test run')
parser.add_argument('--loop_time', metavar='<seconds>', type=float, nargs='?', default='.5',
                    help='sleep is used to insure this minimum loop time in sec. Can be decimal (defaults to .5')
parser.add_argument('--debug', action='store_true')
args = parser.parse_args()
test_run_id = args.test_run_id
loop_time = args.loop_time
if loop_time <= 0:
    print('Loop time must be >= 0, exiting...')
    exit(1)
init_logger(debug=args.debug)
log = logging.getLogger()

db_connection_metadata = collect_user_input()
db = Database(db_connection_metadata)
test_runner = TestRun(db)

if not test_runner.test_db_connection():
    log.fatal('Initial db connection failed.  Check you connection setup and try again. Exiting...')
    exit(1)

pre_failure_db_node_hostname = test_runner.get_db_node_hostname()
print(f'Test starting, initial Db node hostname: {pre_failure_db_node_hostname}')
post_failure_db_node_hostname = None

try:
    while True:
        loop_start_time = time.time()
        test_runner.ensure_minumum_loop_time(loop_time, loop_start_time, test_runner.prev_loop_end_time)
        if test_runner.db_node_heartbeat(test_run_id):
            if test_runner.recovery_detected():
                test_runner.failure_condition_end_time = time.time()
                post_failure_db_node_hostname = test_runner.get_db_node_hostname()
                test_runner.prev_loop_end_time = time.time()
                break
        test_runner.prev_loop_end_time = time.time()
except Exception as e:
    print(f'There was an unexpected exception: {e}')
    print("-" * 60)
    traceback.print_exc(file=sys.stdout)
    print("-" * 60)
    exit(1)
finally:
    test_runner.shutdown()


pp = pprint.PrettyPrinter(indent=2)
print('\n========================================')
print(f'Total Db connection attempts: {test_runner.success_connect_count + test_runner.failed_connect_count}')
print(f'Successful Db connections: {test_runner.success_connect_count}')
print(f'Failed Db connections: {test_runner.failed_connect_count}')
print(f'failure_start_time: {time.ctime(test_runner.failure_condition_start_time)}')
print(f'failure_end_time: {time.ctime(test_runner.failure_condition_end_time)}')
duration = int(test_runner.failure_condition_end_time - test_runner.failure_condition_start_time)
print(f'failure condition duration: {duration} seconds')
print(f'Last inserted sync record id on initial primary db node: {test_runner.last_inserted_heartbeat_index}')
print(f'Pre-failure Db node hostname: {pre_failure_db_node_hostname}')
print(f'Post-failure Db node hostname: {post_failure_db_node_hostname}')
print(f'Newest 5 sync records in current primary db node:')
pp.pprint(test_runner.get_last_sync_records(test_run_id, 5))
sh-4.2$
 
h-4.2$ cat create_failover_sync_db.py
#!/usr/bin/env python3

import argparse
import logging

from db_test_meter.database import Database
from db_test_meter.util import init_logger, collect_user_input, AppConfig


def create_db(db: Database) -> None:
    """
    Utility to create the db and table for the sync check
    :param db:
    :return:
    """
    try:
        log.debug(f'creating database {AppConfig.TEST_DB_NAME}')
        db.run_query(f"DROP DATABASE IF EXISTS {AppConfig.TEST_DB_NAME}")
        db.run_query(f"CREATE DATABASE IF NOT EXISTS {AppConfig.TEST_DB_NAME}")
        log.debug(f'creating table {AppConfig.TEST_DB_TABLE}')
        db.run_query(
            f"CREATE TABLE {AppConfig.TEST_DB_NAME}.{AppConfig.TEST_DB_TABLE} (`test_run_id` varchar(50) NOT NULL, `index_id` int(10) unsigned NOT NULL, `created` int(8) NOT NULL)")
        print(f'Database {AppConfig.TEST_DB_NAME} created')
        print(f'Table {AppConfig.TEST_DB_NAME}.{AppConfig.TEST_DB_TABLE} created')
    except Exception as e:
        print(f'There was an error: {e}')


parser = argparse.ArgumentParser(
    'simple utility to create the db and table used by failover_test.py. Usage: ./create_failover_sync_db.py')
parser.add_argument('--debug', action='store_true')
init_logger(debug=parser.parse_args().debug)
log = logging.getLogger()

print('This will destroy and recreate sync database and tracking table')
if (input("enter y to continue, n to exit [n]: ") or 'n').lower() == 'y':
    db_connection_metadata = collect_user_input()
    db = Database(db_connection_metadata)
    create_db(db)
else:
    print('exiting...')
sh-4.2$

 

 

h-4.2$ cat database.py
import sys
import pymysql
import logging


class Database:
    """Database connection class."""

    def __init__(self, db_connection_metadata):
        self.host = db_connection_metadata['db_host']
        self.port = int(db_connection_metadata['db_port'])
        self.username = db_connection_metadata['db_user']
        self.password = db_connection_metadata['db_password']
        self.charset = 'utf8mb4'
        self.cursorclass = pymysql.cursors.DictCursor
        self.read_timeout = db_connection_metadata['db_interact_timeout']  # 1 sec
        self.write_timeout = db_connection_metadata['db_interact_timeout']  # 1 sec
        self.connect_timeout = db_connection_metadata['db_interact_timeout']  # 1 sec
        self.ssl_metadata = db_connection_metadata['ssl_metadata']

        self.conn = None

    def open_connection(self):

        if self.conn is None:
            logging.debug('opening db connection')
            self.conn = pymysql.connect(
                host=self.host,
                port=self.port,
                user=self.username,
                password=self.password,
                charset='utf8mb4',
                cursorclass=pymysql.cursors.DictCursor,
                read_timeout=self.read_timeout,  # 1 sec
                write_timeout=self.write_timeout,  # 1 sec
                connect_timeout=self.connect_timeout,  # 1 sec
                ssl=self.ssl_metadata
            )
            logging.debug('Connection opened successfully.')

    def run_query(self, query, query_params=None):
        try:
            cur = None
            self.open_connection()
            with self.conn.cursor() as cur:
                if 'SELECT' in query or 'SHOW' in query:
                    records = []
                    logging.debug(f'executing query: {query}  params:{query_params}')
                    cur.execute(query, query_params)
                    result = cur.fetchall()
                    for row in result:
                        records.append(row)
                    logging.debug('closing db connection')
                    cur.close()
                    return records
                else:
                    logging.debug(f'executing query: {query}  params:{query_params}')
                    cur.execute(query, query_params)
                    self.conn.commit()
                    affected = f"{cur.rowcount} rows affected."
                    logging.debug('closing db connection')
                    cur.close()
                    return affected
        except pymysql.MySQLError as e:
            print(e)
            raise Exception('Db Connection failed')
#        finally:
 #           if cur:
 #               cur.close()
        except pymysql.OperationalError as e:
            print(e)
            raise Exception('Query failed to write')

    def close_connection(self):
        if self.conn:
            self.conn.close()
            self.conn = None
            logging.info('Database connection closed.')sh-4.2$
h-4.2$ cat test_run.py
import time

from db_test_meter.database import Database
from db_test_meter.util import log, AppConfig


class TestRun:

    def __init__(self, db: Database):
        self.db = db
        self.success_connect_count: int = 0
        self.failed_connect_count: int = 0
        self.current_phase: str = 'INIT'
        self.prev_loop_end_time: float = 0
        self.failure_condition_start_time: float = 0
        self.failure_condition_end_time: float = 0
        self.heartbeat_index = 0
        self.last_inserted_heartbeat_index = 0

    def test_db_connection(self) -> bool:
        try:
            self.db.run_query('SELECT version()')
            print(f'Connection succeeded at {time.ctime()}')
            self.success_connect_count += 1
            return True
        except Exception as e:
            print(f'There was an error: {e}')
            if self.current_phase == 'INIT':
                self.failure_condition_start_time = time.time()
            self.current_phase = 'FAILING'
            self.failed_connect_count += 1
            if self.failed_connect_count <= 600:  # limit error start to ~ 10 minutes
                return False
            else:
                log.fatal('Maximum Db connection failures of 600 occurred, exiting...')
                exit(1)

    def get_db_node_hostname(self):
        query = "SHOW variables LIKE 'hostname'"
        result = self.db.run_query(query)
        if result and 'Value' in result[0]:
            db_node_hostname = result[0]["Value"]
            log.debug(f'Db node Hostname: {db_node_hostname}')
        else:
            raise Exception(f'Unable to retrieve db node hostname with query: {query}')
        return db_node_hostname

    def db_node_heartbeat(self, test_run_id: str) -> bool:
        try:
            if self.current_phase == 'FAILING':
                return self.test_db_connection()
            else:
                self.heartbeat_index += 1
                self.db.run_query(
                    f"INSERT INTO {AppConfig.TEST_DB_NAME}.{AppConfig.TEST_DB_TABLE} SET test_run_id=%s, index_id=%s, created=UNIX_TIMESTAMP()",
                    (test_run_id, self.heartbeat_index,))
                print(f'Insert succeeded at {time.ctime()} test_run_id: {test_run_id}, index_id:{self.heartbeat_index}')
                self.last_inserted_heartbeat_index = self.heartbeat_index
                self.success_connect_count += 1
            return True
        except Exception as e:
            print(f'There was an error: {e}')
            if self.current_phase == 'INIT':
                self.failure_condition_start_time = time.time()
                time.sleep(120)
            self.current_phase = 'FAILING'
            # we've failed so kill this connection
            self.db.close_connection()
            self.failed_connect_count += 1
            if self.failed_connect_count <= 600:  # limit error start to ~ 10 minutes
                return False
            else:
                log.fatal('Maximum Db connection failures of 600 occurred, exiting...')
                exit(1)

    def recovery_detected(self) -> bool:
        if self.current_phase == 'FAILING':
            # we've recovered
            log.debug('moving from phase FAILING -> RECOVERED')
            self.current_phase = 'RECOVERED'
            return True
        return False

    def ensure_minumum_loop_time(self, loop_time_min_in_sec: float, loop_start_time: float, prev_loop_end_time: float):

        if prev_loop_end_time != 0:
            log.debug(f'this loop start time: {loop_start_time}')
            log.debug(f'prev loop start end time: {prev_loop_end_time}')
            last_loop_runtime = loop_start_time - prev_loop_end_time
            log.debug(f'last loop runtime: {last_loop_runtime}')
            if last_loop_runtime < loop_time_min_in_sec:
                sleep_time = loop_time_min_in_sec - last_loop_runtime
                log.debug(f'sleeping {sleep_time}')
                time.sleep(sleep_time)

    def get_last_sync_records(self, test_run_id: str, number_of_records: int) -> dict:
        result = self.db.run_query(
            f'SELECT * FROM {AppConfig.TEST_DB_NAME}.{AppConfig.TEST_DB_TABLE} WHERE test_run_id = %s ORDER BY `index_id` DESC LIMIT %s',
            (test_run_id, number_of_records))
        return result

    def shutdown(self):
        self.db.close_connection()
sh-4.2$
sh-4.2$ cat util.py
import getpass
import logging
import os
import sys
import json

import boto3

client  = boto3.client('secretsmanager')


log = logging.getLogger()


class AppConfig:
    TEST_DB_NAME = 'db_test_meter'
    TEST_DB_TABLE = 'db_sync'


def init_logger(debug=False) -> None:
    log_level = logging.DEBUG if debug else logging.WARNING
    logging.getLogger().setLevel(log_level)
    handler = logging.StreamHandler(sys.stdout)
    handler.setLevel(log_level)
    log.addHandler(handler)


def collect_user_input() -> dict:
    user_input = {'ssl_metadata': None}
    user_input['db_interact_timeout'] = 1  # 1 sec
    responce  = client.list_secrets(MaxResults=1)
    user_input['secret_arn'] = responce['SecretList'][0]['ARN']
    user_input['secret_versionId'] = responce['SecretList'][0]['SecretVersionsToStages']
    responce = client.get_secret_value(SecretId=user_input['secret_arn'])
    secretString = json.loads(responce['SecretString'])
    user_input['db_user'] = secretString['username']
    user_input['db_password'] = secretString['password']
    user_input['db_host'] = secretString['host']
    user_input['db_port'] = secretString['port']
    using_ssl = input('Connecting over SSL (y/n) [y]: ').strip().lower() or 'y'
    if using_ssl == 'y':
        path_to_ssl_cert = input('path to ssl cert [./rds-combined-ca-bundle.pem]: ') or './rds-combined-ca-bundle.pem'
        if not os.path.exists(os.path.abspath(path_to_ssl_cert)):
            log.fatal(f'SSL cert not found at: {path_to_ssl_cert}')
            exit(1)
        user_input['ssl_metadata'] = {'ssl': {'ca': path_to_ssl_cert}}
    print(user_input['db_host'])
    return user_inputsh-4.2$
from __future__ import print_function # Python 2/3 compatibility
import boto3
import json
import decimal
import time
import os

count = 0

session = boto3.session.Session()
region = session.region_name

dynamodb = boto3.resource('dynamodb', region_name=region)
table_name = 'Cast'   # table name
pk = 'year'           # primary key
sk = 'title'          # sort key
file_name = 'cast_full.json'

def create_table():
    try:
        table = dynamodb.create_table(
            TableName=table_name,
            KeySchema=[
                {
                    'AttributeName': pk,
                    'KeyType': 'HASH'  #Partition key
                },
                {
                    'AttributeName': sk,
                    'KeyType': 'RANGE'  #Sort key
                }
            ],
            AttributeDefinitions=[
                {
                    'AttributeName': 'year',
                    'AttributeType': 'N'
                },
                {
                    'AttributeName': 'title',
                    'AttributeType': 'S'
                },
            ],
            BillingMode='PAY_PER_REQUEST'
            #ProvisionedThroughput={
            #  'ReadCapacityUnits': 125,
            #  'WriteCapacityUnits': 125
            # }
        )
        print("Table status:", table.table_status)
    except:
        print("Table exist:Uploading data")
        table = dynamodb.Table('Cast')

def add_table():
    table = dynamodb.Table(table_name)
    count = 0
    with open(file_name) as json_file:
        movies = json.load(json_file, parse_float = decimal.Decimal)
        with table.batch_writer(overwrite_by_pkeys=[pk, sk]) as batch:
            for movie in movies:
                titleId = movie['titleId']
                title = movie['title']
                year = int(movie['year'])
                genres = movie['genres']
                runtimeMinutes = int(movie['runtimeMinutes'])
                cast = movie['cast']
                count = count + 1
                print("Adding record count:", count)
                batch.put_item(
                Item={
                    'titleId': titleId,
                    'year': year,
                    'title': title,
                    'genres': genres,
                    'runtimeMinutes': runtimeMinutes,
                    'cast': cast,
                    }
                )
def main():
    create_table()
    add_table()

if __name__ == "__main__":
    main()
from __future__ import print_function # Python 2/3 compatibility
import boto3
import json
import decimal
from boto3.dynamodb.conditions import Key, Attr

table_name = 'Cast'
pk = 'year'
sk = 'title'

session = boto3.session.Session()
region = session.region_name

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            if o % 1 > 0:
                return float(o)
            else:
                return int(o)
        return super(DecimalEncoder, self).default(o)

dynamodb = boto3.resource('dynamodb', region_name=region)

table = dynamodb.Table(table_name)

fe = Key(pk).between(1990, 1991) & Key(sk).between('A', 'D')
pe = "#yr, title, #ca"
ean = { "#yr": "year", "#ca": "cast",}
esk = None

response = table.scan(
    FilterExpression=fe,
    ProjectionExpression=pe,
    ExpressionAttributeNames=ean
    )

for i in response['Items']:
    print(json.dumps(i, cls=DecimalEncoder))

while 'LastEvaluatedKey' in response:
    response = table.scan(
        ProjectionExpression=pe,
        FilterExpression=fe,
        ExpressionAttributeNames= ean,
        ExclusiveStartKey=response['LastEvaluatedKey']
        )
    #parsing and printing the JSON response
    for i in response['Items']:
        print(i['year'], ":", i['title'] + " and the actors are:")
        for j in i['cast']:
            print(j['name'])
        print('\n')
from __future__ import print_function # Python 2/3 compatibility
import boto3
import json
import decimal
from boto3.dynamodb.conditions import Key, Attr

session = boto3.session.Session()
region = session.region_name
table_name = 'Cast'
pk = 'year'
sk = 'title'

class DecimalEncoder(json.JSONEncoder):
    def default(self, o):
        if isinstance(o, decimal.Decimal):
            return str(o)
        return super(DecimalEncoder, self).default(o)

dynamodb = boto3.resource('dynamodb', region_name=region)

table = dynamodb.Table(table_name)

print("Movies in 2005 - titles A-L, and list of actors")

response = table.query(
    ProjectionExpression="#yr, title, #ca",
    ExpressionAttributeNames={ "#yr": "year", "#ca": "cast" },
    KeyConditionExpression=Key(pk).eq(2005) & Key(sk).between('A', 'L')
)

for i in response['Items']:
        print(i['year'], ":", i['title'] + " and the actors are:")
        for j in i['cast']:
            print(j['name'])
        print('\n')
from __future__ import print_function

import os
import amazondax
import botocore.session
import boto3

my_session = boto3.session.Session()
region = my_session.region_name

session = botocore.session.get_session()
dynamodb = session.create_client('dynamodb', region_name=region) # low-level client

table_name = "TryDaxTable"

params = {
    'TableName' : table_name,
    'KeySchema': [
        { 'AttributeName': "pk", 'KeyType': "HASH"},    # Partition key
        { 'AttributeName': "sk", 'KeyType': "RANGE" }   # Sort key
    ],
    'AttributeDefinitions': [
        { 'AttributeName': "pk", 'AttributeType': "N" },
        { 'AttributeName': "sk", 'AttributeType': "N" }
    ],
    'ProvisionedThroughput': {
        'ReadCapacityUnits': 10,
        'WriteCapacityUnits': 10
    }
}

# Create the table
dynamodb.create_table(**params)

# Wait for the table to exist before exiting
print('Waiting for', table_name, '...')
waiter = dynamodb.get_waiter('table_exists')
waiter.wait(TableName=table_name)
[ssm-user@ip-10-0-1-89 python]$
from __future__ import print_function

import os, sys, time
import amazondax
import botocore.session
import boto3

my_session = boto3.session.Session()
region = my_session.region_name

session = botocore.session.get_session()
dynamodb = session.create_client('dynamodb', region_name=region) # low-level client

table_name = "TryDaxTable"

if len(sys.argv) > 1:
    endpoint = sys.argv[1]
    dax = amazondax.AmazonDaxClient(session, region_name=region, endpoints=[endpoint])
    client = dax
else:
    client = dynamodb

pk = 10
sk = 10
iterations = 50

start = time.time()
for i in range(iterations):
    for ipk in range(1, pk+1):
        for isk in range(1, sk+1):
            params = {
                'TableName': table_name,
                'Key': {
                    "pk": {'N': str(ipk)},
                    "sk": {'N': str(isk)}
                }
            }

            result = client.get_item(**params)
            print('.', end='', file=sys.stdout); sys.stdout.flush()
print()

end = time.time()
print('Total time: {} sec - Avg time: {} sec'.format(end - start, (end-start)/iterations))
 
 

'AWS > AWS Database' 카테고리의 다른 글

AWS DocumentDB  (0) 2022.04.13
AWS Aurora 데이터베이스  (0) 2022.04.13
AWS RDS TSL/SSL connection  (0) 2022.04.13
AWS DB 실습 동영상  (0) 2022.04.13
Amazon RDS 데이터베이스  (0) 2022.04.12

+ Recent posts