Snippets: Python: Initscript-like scripts for Gunicorn with Conda: runner.py

 20th August 2020 at 2:19pm
#!#INSTALL_PATH/bin/python
# coding=utf8

import argparse
import filecmp
import json
import logging
import os
from pathlib import Path
import shutil
import signal
import subprocess
import sys
import time

GUNICORN_CONF = 'gunicorn_conf.py'
CONDA_BIN = '#INSTALL_PATH/bin/conda'
SUPPLEMENT_PACKAGES = ['gunicorn=19.7.*', 'gevent=1.2.*', 'setproctitle=1.1.*']

logger = logging.getLogger('runner')


def setup_logging():
    logger.setLevel(logging.DEBUG)

    handler = logging.StreamHandler()
    handler.setLevel(logging.DEBUG)

    formatter = logging.Formatter('[%(levelname)s] %(message)s')
    handler.setFormatter(formatter)
    logger.addHandler(handler)


def join_args(args):
    """join command arguments mixed with Path and string"""
    return ' '.join(str(x) for x in args)


def get_project_name(project_root):
    """
    从 /usr/local/services/radio_api_server-1.0 中,提取出 radio_api_server。
    织云包发布上的项目名,允许杠杆和下划线。
    """
    return '-'.join(project_root.name.split('-')[:-1])


def test_is_gunicorn_process(pid):
    # 避免用 psutil,这样只用标准库就能运行
    with open(f'/proc/{pid}/comm') as f:
        comm = f.read().strip()

    # comm 会被 truncate 到 16 字节
    return comm.startswith('gunicorn')


def get_conda_env_dir():
    # Preparing conda env
    result = subprocess.run([CONDA_BIN, 'info', '--json'], 
                            stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    conda_info = json.loads(result.stdout)
    return Path(conda_info['envs_dirs'][0])


def prepare_conda_env(project_env_dir, project_root, project_name):
    env_file = project_root / 'environment.yaml'
    previous_env_file = project_root / 'environment.yaml.latest'

    if project_env_dir.exists():
        if previous_env_file.exists():
            is_same_file = filecmp.cmp(env_file, previous_env_file)
            if is_same_file:
                logger.info("environment.yaml is the same during last deployment. No need to update conda env.")
                return
            
        logger.info('environment.yaml is changed during last deployment. Updating conda env...')
        conda_args = [CONDA_BIN, 'env', 'update', '-n', project_name, '-f', env_file]
        result = subprocess.run(conda_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        logger.info(f'\n{result.stdout.decode()}\n')

        if result.returncode != 0:
            logger.error(f"Update conda env failed. args: {join_args(conda_args)}, return {result.returncode}.")
            sys.exit(1)
        logger.info('Update conda env succeed.')
    else:
        logger.info('Creating conda env...')
        conda_args = [CONDA_BIN, 'env', 'create', '-n', project_name, '-f', env_file]
        result = subprocess.run(conda_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        logger.info(f'\n{result.stdout.decode()}\n')

        if result.returncode != 0:
            logger.error(f"Update conda env failed. args: {join_args(conda_args)}, return {result.returncode}.")
            sys.exit(1)
        logger.info('Create conda env succeed.')

    logger.info(f'Installing gunicorn related packages: {SUPPLEMENT_PACKAGES}...')
    conda_args = [CONDA_BIN, 'install', '-n', project_name, '-y', *SUPPLEMENT_PACKAGES]
    result = subprocess.run(conda_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
    logger.info(f'\n{result.stdout.decode()}\n')

    if result.returncode != 0:
        logger.error(f"Install supplement pacakges failed. args: {join_args(conda_args)}, return {result.returncode}.")
        sys.exit(1)
    logger.info('Install supplement pacakges succeed.')

    shutil.copyfile(env_file, previous_env_file)


def deal_with_gunicorn(project_root, project_name, project_env_dir, app_module, command):
    # TODO: log rotation / retention
    var_path = project_root / 'log'
    pid_file_path = var_path / 'gunicorn.pid'
    access_log_path = var_path / 'access.log'
    error_log_path = var_path / 'error.log'

    gunicorn_bin = project_env_dir / 'bin' / 'gunicorn'
    gunicorn_conf_file_path = project_root / 'gunicorn_conf.py'

    if command == 'start':
        if pid_file_path.exists():
            pid = int(pid_file_path.open().read())
            try:
                is_gunicorn = test_is_gunicorn_process(pid)
            except IOError:
                logger.warning('gunicorn pid file exists, but process is gone.')
            else:
                if is_gunicorn:
                    logger.error('gunicorn has already started before.')
                    sys.exit(1)

        start_args = [
            gunicorn_bin, '-c', gunicorn_conf_file_path, '-n', project_name,
            '--pythonpath', project_root, '-p', pid_file_path, '--daemon',
            '--access-logfile', access_log_path, '--error-logfile', error_log_path,
            '--capture-output', app_module,
        ]

        result = subprocess.run(start_args, stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
        logger.info('Starting gunicorn...')
        logger.info(f'\n{result.stdout.decode()}\n')

        if result.returncode != 0:
            logger.error(f"Start gunicorn failed. args: {join_args(start_args)}, return {result.returncode}.")
            sys.exit(1)

        logger.info(f"Start gunicorn finished. If things aren't going well, check {error_log_path}.")

    elif command == 'restart':
        deal_with_gunicorn(project_root, project_name, project_env_dir, app_module, 'stop')

        logger.info("Wait a second for gunicorn to exit...")
        time.sleep(5.0)

        deal_with_gunicorn(project_root, project_name, project_env_dir, app_module, 'start')

    elif command in ['reload', 'stop']:
        if not pid_file_path.exists():
            logger.error("gunicorn pid file is missing. May be it't not running?")
            sys.exit(1)
        
        pid = int(pid_file_path.open().read())
        try:
            is_gunicorn = test_is_gunicorn_process(pid)
        except IOError:
            logger.error('gunicorn pid file exists, but process is gone.')
            sys.exit(1)

        if not is_gunicorn:
            logger.error(f'process name of pid {pid} is not gunicorn. Need manual operation.')
            sys.exit(1)

        signal_to_send = dict(reload=signal.SIGHUP, stop=signal.SIGTERM)[command]
        logger.info(f'Sending signal {signal_to_send} to {command} gunicorn process...')
        os.kill(pid, signal_to_send)

        logger.info(f"{command.capitalize()} gunicorn finished. If things aren't going well, check {error_log_path}.")


def main(project_root, command, app_module):
    setup_logging()

    env_file = project_root / 'environment.yaml'
    if not env_file.exists():
        logger.error(f'{env_file} is missing. We need this file to prepare conda env.')
        sys.exit(1)

    gunicorn_conf = project_root / GUNICORN_CONF
    if not gunicorn_conf.exists():
        logger.error(f'{gunicorn_conf} is missing. We need this file to run gunicorn.')
        sys.exit(1)
    
    project_name = get_project_name(project_root)
    project_env_dir = get_conda_env_dir() / project_name   

    prepare_conda_env(project_env_dir, project_root, project_name)

    if command == 'prepare':
        sys.exit(0)
    
    deal_with_gunicorn(project_root, project_name, project_env_dir, app_module, command)


if __name__ == '__main__':
    description = """Task runner based on conda env and config files.

`command' argument should be one of the following:
    start   - start the app, failed if already started
    stop    - stop the app, failed if not running
    restart - basically stop and start
    reload  - restart the app graceful (if applicable)
    prepare - just prepare the conda env and don't do anything else
"""

    parser = argparse.ArgumentParser(description=description,
                                     formatter_class=argparse.RawDescriptionHelpFormatter)
    parser.add_argument('-p', help='project root dir', required=True)
    parser.add_argument('-t', help='run type', choices=['gunicorn'], required=True)
    parser.add_argument('--app-module', help='app module for gunicorn', required=True)
    parser.add_argument('command', choices=['start', 'stop', 'restart', 'reload', 'prepare'])

    args = parser.parse_args()

    main(Path(args.p), args.command, args.app_module)