Source code for k2hr3_osnl

# -*- coding: utf-8 -*-
#
# K2HR3 OpenStack Notification Listener
#
# Copyright 2018 Yahoo Japan Corporation
#
# K2HR3 is K2hdkc based Resource and Roles and policy Rules, gathers
# common management information for the cloud.
# K2HR3 can dynamically manage information as "who", "what", "operate".
# These are stored as roles, resources, policies in K2hdkc, and the
# client system can dynamically read and modify these information.
#
# For the full copyright and license information, please view
# the licenses file that was distributed with this source code.
#
# AUTHOR:   Hirotaka Wakabayashi
# CREATE:   Tue Sep 11 2018
# REVISION:
#

"""K2hr3 OpenStack Notification message Listener."""
from __future__ import (absolute_import, division, print_function,
                        unicode_literals)

__all__ = [
    'K2hr3Conf',
    'K2hr3ConfError',
    'K2hr3NotificationEndpoint',
    'K2hr3NotificationEndpointError',
    'listen',
    'main',
    'version',
]
__author__ = 'Hirotaka Wakabayashi <hiwakaba@yahoo-corp.jp>'
__version__ = '1.0.6'

import argparse
import logging
from logging.handlers import TimedRotatingFileHandler
from logging import StreamHandler
from pathlib import Path
import sys
import time
from typing import List, Set, Dict, Tuple, Optional  # noqa: pylint: disable=unused-import

import oslo_config  # type: ignore
import oslo_messaging  # type: ignore

from k2hr3_osnl.cfg import K2hr3Conf
from k2hr3_osnl.exceptions import K2hr3Error, K2hr3ConfError, K2hr3NotificationEndpointError
from k2hr3_osnl.endpoint import K2hr3NotificationEndpoint

LOG = logging.getLogger(__name__)

if sys.platform.startswith('win'):
    raise ImportError(r'Currently we do not test well on windows')


[docs]def version() -> str: """Returns a version of k2hr3_osnl package. :returns: version :rtype: str """ return __version__
[docs]def main() -> int: """Runs a oslo_messaging notification listener for k2hr3. You can configure the listener by the config file. Simple usage: $ k2hr3_osnl -c etc/k2hr3_osnl.config :returns: 0 if success, otherwise 1. :rtype: int """ parser = argparse.ArgumentParser( description='An oslo.messaging notification listener for k2hr3.') parser.add_argument( '-c', '--config-file', dest='config_file', default='/etc/k2hr3/k2hr3_osnl.conf', help='config file path') parser.add_argument( '-d', dest='debug_level', choices=('debug', 'info', 'warn', 'error', 'critical'), help='debug level. default: defined in the config_file') parser.add_argument( '-l', dest='libs_debug_level', choices=('debug', 'info', 'warn', 'error', 'critical'), help='dependent libraries loglevel. default: defined in the config_file' ) parser.add_argument( '-f', dest='log_file', help='log file path. default: defined in the config_file') parser.add_argument( '-v', action='version', version='%(prog)s ' + __version__) args = parser.parse_args() try: conf = K2hr3Conf(Path(args.config_file)) _configure_logger(args, conf) # logger configured by args and conf. endpoints = [K2hr3NotificationEndpoint(conf)] sys.exit(listen(endpoints)) except K2hr3Error as error: LOG.error('K2hr3Error error, %s', error) raise K2hr3Error("K2hr3 RuntimeError") from error except Exception as error: LOG.error('Unknown error, %s', error) raise RuntimeError("Unknown RuntimeError") from error
_nametolevel = { 'error': logging.ERROR, 'warn': logging.WARNING, 'info': logging.INFO, 'debug': logging.DEBUG, 'notset': logging.NOTSET } def _configure_logger(args, conf) -> bool: """Configures logger settings by args and conf. :param args: command line args :type argparse: command line args :param conf: configuration :type K2hr3Conf: configuration :returns: True if success, otherwise False :rtype: bool """ # We prefer args than configuration file. # 1. debug_level debug_level = logging.WARNING if args.debug_level is not None: debug_level = _nametolevel.get(args.debug_level, logging.WARNING) else: debug_level = _nametolevel.get(conf.debug_level, logging.WARNING) LOG.setLevel(debug_level) # 2. formatter formatter = logging.Formatter( '%(asctime)-15s %(levelname)s %(name)s:%(lineno)d %(message)s') # hardcoding # 3. log_file if args.log_file is not None: # check the permission of the destination file. # if unable to open it, use default(stderr). # Add the log message handler to the logger handler = TimedRotatingFileHandler( args.log_file, when='midnight', encoding='UTF-8', backupCount=31) handler.setFormatter(formatter) LOG.addHandler(handler) else: if conf.log_file == 'sys.stderr': stream_handler = StreamHandler(sys.stderr) stream_handler.setFormatter(formatter) LOG.addHandler(stream_handler) else: # Add the log message handler to the logger handler = TimedRotatingFileHandler( conf.log_file, when='midnight', encoding='UTF-8', backupCount=31) handler.setFormatter(formatter) LOG.addHandler(handler) # 3. libs_debug_level libs_debug_level = logging.WARNING if args.libs_debug_level is not None: libs_debug_level = _nametolevel.get(args.libs_debug_level, logging.WARNING) else: libs_debug_level = _nametolevel.get(conf.libs_debug_level, logging.WARNING) libs = [ 'stevedore.extension', 'oslo.messaging._drivers.pool', 'oslo.messaging._drivers.impl_rabbit', 'amqp' ] for i in libs: logging.getLogger(i).setLevel(libs_debug_level) return True
[docs]def listen(endpoints: List[K2hr3NotificationEndpoint]) -> int: """Runs a oslo_messaging notification listener for k2hr3. This function is a library endpoint to start a oslo_messaging notification listener for k2hr3. :param endpoints: endpoint to be called by dispatcher when notification messages arrive. :type endpoints: list of K2hr3NotificationEndpoint :returns: 0 if success, otherwise 1. :rtype: int """ # 1. validate endpoints if not isinstance(endpoints, list) or len(endpoints) == 0: LOG.error('invalid endpoints, %s', endpoints) return 1 # 2. validate each endpoint for endpoint in endpoints: if not isinstance(endpoint, K2hr3NotificationEndpoint): LOG.error('found an invalid endpoint, %s', endpoint) return 1 if not isinstance(endpoint.conf, K2hr3Conf): # this never happens. LOG.error('found an invalid conf in an endpoint, %s', endpoint.conf) return 1 conf = endpoint.conf assert isinstance(conf, K2hr3Conf) try: # transport, targets transport = oslo_messaging.get_notification_transport( oslo_config.cfg.CONF, url=conf.oslo_messaging_notifications.transport_url) targets = [ oslo_messaging.Target( topic=conf.oslo_messaging_notifications.topic, exchange=conf.oslo_messaging_notifications.exchange) ] listener = oslo_messaging.get_notification_listener( transport, targets, endpoints, pool=conf.oslo_messaging_notifications.pool, executor=conf.oslo_messaging_notifications.executor, allow_requeue=conf.oslo_messaging_notifications.allow_requeue) listener.start() LOG.info('Starting') while True: time.sleep(1) except KeyboardInterrupt: LOG.info('Stopping') listener.stop() listener.wait() except NotImplementedError: LOG.error('allow_requeue is not supported by driver') return 1 except oslo_messaging.ServerListenError as error: LOG.error('listener error, %s', error.msg) return 1 return 0
if __name__ == "__main__": sys.exit(main()) # # EOF # # # Local variables: # tab-width: 4 # c-basic-offset: 4 # End: # vim600: noexpandtab sw=4 ts=4 fdm=marker # vim<600: noexpandtab sw=4 ts=4 #