from flask import Flask, request, jsonify import hmac import hashlib import os from functools import wraps import base64 import logging import subprocess from datetime import datetime from enum import Enum app = Flask(__name__) # Configuration # In production, use proper secret management WEBHOOK_SECRET = os.environ['WEBHOOK_SECRET'] API_USERNAME = os.environ['API_USERNAME'] API_PASSWORD = os.environ['API_PASSWORD'] DATA_DIR = os.environ['DATA_DIR'] GIT_AUTH_MODE = os.environ['GIT_AUTH_MODE'] if GIT_AUTH_MODE == 'http': auth_option = [] elif GIT_AUTH_MODE == 'ssh': auth_option = ['--config core.sshCommand="ssh -i /ssh/ssh-key" -o StrictHostKeyChecking=accept-new'] else: raise ValueError("Invalid GIT_AUTH_MODE") # Setup logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('webhook.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) class GitProvider(Enum): GITEA = "gitea" GITLAB = "gitlab" UNKNOWN = "unknown" GITEA_EVENTS = { 'push': 'Push events', 'create': 'Branch or tag creation', 'delete': 'Branch or tag deletion', 'pull_request': 'Pull request events', 'issues': 'Issues events' } class GitOperationError(Exception): """Custom exception for git operations""" pass def detect_git_provider(): """ Detect whether the webhook is from Gitea or GitLab based on headers """ if request.headers.get('X-Gitea-Event'): return GitProvider.GITEA elif request.headers.get('X-Gitlab-Event'): return GitProvider.GITLAB return GitProvider.UNKNOWN def verify_signature(f): @wraps(f) def decorated_function(*args, **kwargs): provider = detect_git_provider() payload_body = request.get_data() if provider == GitProvider.GITEA: signature = request.headers.get('X-Gitea-Signature') if not signature: return jsonify({'error': 'No Gitea signature provided'}), 401 expected_signature = hmac.new( WEBHOOK_SECRET.encode('utf-8'), payload_body, hashlib.sha256 ).hexdigest() if not hmac.compare_digest(signature, expected_signature): return jsonify({'error': 'Invalid Gitea signature'}), 401 elif provider == GitProvider.GITLAB: token = request.headers.get('X-Gitlab-Token') if not token: return jsonify({'error': 'No GitLab token provided'}), 401 if not hmac.compare_digest(token, WEBHOOK_SECRET): return jsonify({'error': 'Invalid GitLab token'}), 401 else: return jsonify({'error': 'Unknown Git provider'}), 400 return f(*args, **kwargs) return decorated_function def execute_git_pull(repo_path): """ Execute git pull in the specified repository path Returns tuple of (success, message) """ try: # Verify the directory exists and is a git repository if not os.path.exists(repo_path): raise GitOperationError(f"Directory does not exist: {repo_path}") if not os.path.exists(os.path.join(repo_path, '.git')): raise GitOperationError(f"Not a git repository: {repo_path}") # Create backup of current state current_commit = subprocess.check_output( ['git', 'rev-parse', 'HEAD'], cwd=repo_path, stderr=subprocess.PIPE ).decode().strip() # Execute git pull result = subprocess.run( ['git', 'pull'] + auth_option, cwd=repo_path, capture_output=True, text=True, timeout=300 # 5-minute timeout ) if result.returncode != 0: raise GitOperationError(f"Git pull failed: {result.stderr}") new_commit = subprocess.check_output( ['git', 'rev-parse', 'HEAD'], cwd=repo_path, stderr=subprocess.PIPE ).decode().strip() return True, { 'message': 'Git pull successful', 'output': result.stdout, 'previous_commit': current_commit, 'new_commit': new_commit } except subprocess.TimeoutExpired: raise GitOperationError("Git pull operation timed out") except subprocess.CalledProcessError as e: raise GitOperationError(f"Git command failed: {e.stderr.decode() if e.stderr else str(e)}") except Exception as e: raise GitOperationError(f"Unexpected error: {str(e)}") def require_basic_auth(f): @wraps(f) def decorated_function(*args, **kwargs): auth_header = request.headers.get('Authorization') if not auth_header: return authenticate() try: auth_type, auth_string = auth_header.split(' ', 1) if auth_type.lower() != 'basic': logger.info("Invalid authentication header '{auth_type}'") return authenticate() # Decode base64 credentials credentials = base64.b64decode(auth_string).decode('utf-8') username, password = credentials.split(':', 1) if not (username == API_USERNAME and password == API_PASSWORD): logger.info(f"Invalid authentication with username='{username}' and password='{password}'") return authenticate() except (ValueError, base64.binascii.Error): logger.info("Invalid authentication encoding") return authenticate() return f(*args, **kwargs) return decorated_function def authenticate(): """Sends a 401 response that enables basic auth""" return jsonify({'error': 'Authentication required'}), 401, { 'WWW-Authenticate': 'Basic realm="Login Required"' } def parse_push_event(provider, payload): """ Parse push event payload based on the provider Returns standardized data structure """ try: if provider == GitProvider.GITEA: return { 'repo_name': payload['repository']['full_name'].split('/')[-1], 'repo_full_name': payload['repository']['full_name'], 'branch': payload['ref'].split('/')[-1], 'commits': payload['commits'], 'default_branch': payload['repository']['default_branch'] } elif provider == GitProvider.GITLAB: return { 'repo_name': payload['project']['web_url'].split('/')[-1], 'repo_full_name': payload['project']['name'], 'branch': payload['ref'].split('/')[-1], 'commits': payload['commits'], 'default_branch': payload['project']['default_branch'] } else: raise ValueError("Unknown git provider") except KeyError as e: raise KeyError(f"Missing required field in payload: {str(e)}") @app.route('/webhook', methods=['POST']) @require_basic_auth @verify_signature def webhook(): if not request.is_json: return jsonify({'error': 'Content type must be application/json'}), 400 provider = detect_git_provider() payload = request.get_json() # Determine event type based on provider if provider == GitProvider.GITEA: event_type = request.headers.get('X-Gitea-Event') elif provider == GitProvider.GITLAB: event_type = request.headers.get('X-Gitlab-Event') else: return jsonify({'error': 'Unknown git provider'}), 400 # Normalize event types between Gitea and GitLab if provider == GitProvider.GITLAB and event_type == 'Push Hook': event_type = 'push' if event_type.lower() == 'push': return handle_push(provider, payload) else: logger.info(f"Received unhandled event type: {event_type} from {provider.value}") return jsonify({'status': 'received', 'event': event_type, 'provider': provider.value}), 200 def handle_push(provider, payload): """Handle push events""" try: # Parse the payload according to the provider's format data = parse_push_event(provider, payload) logger.info(f"Push to {data['repo_full_name']} on branch {data['branch']} via {provider.value}") logger.info(f"Number of commits: {len(data['commits'])}") # Only process pushes to the default branch if data['branch'] != data['default_branch']: logger.info(f"Skipping pull for non-default branch: {data['branch']}") return jsonify({ 'status': 'skipped', 'event': 'push', 'provider': provider.value, 'reason': f"Push was to non-default branch {data['branch']}" }), 200 # Construct the repository path repo_path = DATA_DIR #os.path.join(DATA_DIR, data['repo_name']) try: success, result = execute_git_pull(repo_path) logger.info(f"Git pull result for {data['repo_name']}: {result['message']}") response_data = { 'status': 'success', 'event': 'push', 'provider': provider.value, 'repo': data['repo_full_name'], 'branch': data['branch'], 'commits_count': len(data['commits']), 'git_pull': result } except GitOperationError as e: logger.error(f"Git pull failed for {data['repo_name']}: {str(e)}") response_data = { 'status': 'error', 'event': 'push', 'provider': provider.value, 'repo': data['repo_full_name'], 'branch': data['branch'], 'commits_count': len(data['commits']), 'error': str(e) } return jsonify(response_data), 200 except KeyError as e: logger.error(f"Invalid payload structure: {str(e)}") return jsonify({'error': f'Invalid payload structure: {str(e)}'}), 400 #def handle_pull_request(payload): # """Handle pull request events""" # try: # action = payload['action'] # pr_number = payload['number'] # repo = payload['repository']['full_name'] # # logger.info(f"Pull request #{pr_number} {action} in {repo}") # # return jsonify({ # 'status': 'success', # 'event': 'pull_request', # 'action': action, # 'pr_number': pr_number, # 'repo': repo # }), 200 # except KeyError as e: # logger.error(f"Invalid payload structure: {str(e)}") # return jsonify({'error': f'Invalid payload structure: {str(e)}'}), 400 # #def handle_issues(payload): # """Handle issues events""" # try: # action = payload['action'] # issue_number = payload['issue']['number'] # repo = payload['repository']['full_name'] # # logger.info(f"Issue #{issue_number} {action} in {repo}") # # return jsonify({ # 'status': 'success', # 'event': 'issues', # 'action': action, # 'issue_number': issue_number, # 'repo': repo # }), 200 # except KeyError as e: # logger.error(f"Invalid payload structure: {str(e)}") # return jsonify({'error': f'Invalid payload structure: {str(e)}'}), 400 if __name__ == '__main__': app.run(host='0.0.0.0', port=5000)