name: IP Quality Score version: '1.1' icon:  script: type: python test_connection_code: | import os import sys import urllib3 import argparse import requests # disabling all urllib3 warnings. urllib3.disable_warnings() # For security reason all action params are passed to docker container as Environment variable with variable name equal # to the id specified into yaml. we need to have a class to manage Environment variable class EnvDefault(argparse.Action): def __init__(self, required=True, default=None, **kwargs): env_var = kwargs.get("dest") default = os.environ.get(env_var, default) if env_var in os.environ else default required = False if required and default else required super(EnvDefault, self).__init__(default=default, required=required, **kwargs) def __call__(self, parser, namespace, values, option_string=None): setattr(namespace, self.dest, values) try: # Object for parsing command line strings into Python objects. parsers = argparse.ArgumentParser() parsers.add_argument('--api_url', help='API URL, REQUIRED', required=True, action=EnvDefault) parsers.add_argument('--api_key', help='API Key, REQUIRED', required=True, action=EnvDefault) parsers.add_argument('--timeout', help='timeout', required=False, action=EnvDefault) parsers.add_argument('--verify', help='verify', required=False, action=EnvDefault) parsers.add_argument('--proxy_url', help='proxy_url', required=False, action=EnvDefault) args, unknown = parsers.parse_known_args() # protocol and hostname to the URL of the proxy. proxies = {'http': args.proxy_url, 'https': args.proxy_url} if args.proxy_url is not None else None try: # How long to wait for the server to send data before giving up, as a float. timeout = float(args.timeout) except (ValueError, TypeError): timeout = 180.0 # this controls whether we verify the server's TLS certificate or not verify = args.verify == 'true' # API endpoint details at https://www.ipqualityscore.com/documentation/usage/overview endpoint = "{host}/api/json/account/{api_key}".format(host=args.api_url.rstrip('/'), api_key=args.api_key) session = requests.Session() # Sends a GET request response = session.get(endpoint, timeout=timeout, verify=verify, proxies=proxies) # HTTPError if one occurred response.raise_for_status() # Returns the json-encoded content of a response response = response.json() # check if the 'success' is True or False, was the request successful? success = response.get("success") if not success: # When response 'success' is False, we need to get correct error message err_msg = response.get("message") if err_msg is not None: # raise an in case we proved wrong API key raise Exception(str(err_msg)) # just exit in case we do not have any error, and we do not have any issue exit(0) except Exception as e: # write an Exception sys.stderr.write(str(e)) exit(-1) docker_repo_tag: 'python3_generic:latest' configuration: testable_connection: true require_proxy_config: true data_attributes: api_url: label: 'API URL' type: 'text' required: true default: "https://www.ipqualityscore.com/" hint: "https://www.ipqualityscore.com/" api_key: label: 'API Key' type: password required: true timeout: label: 'Connection Timeout (s)' type: text required: false validator: integer default: '120' verify: label: 'Verify Server Certificate' type: checkbox required: false