helm_website/docker/deployer_docker/webhook/gitea-webhook-handler.py
2024-12-08 11:49:06 +01:00

350 lines
12 KiB
Python

from flask import Flask, request, jsonify
import hmac
import hashlib
import os
from functools import wraps
import base64
import logging
import subprocess
from datetime import datetime
from enum import Enum
app = Flask(__name__)
# Configuration
# In production, use proper secret management
WEBHOOK_SECRET = os.environ['WEBHOOK_SECRET']
API_USERNAME = os.environ['API_USERNAME']
API_PASSWORD = os.environ['API_PASSWORD']
DATA_DIR = os.environ['DATA_DIR']
GIT_AUTH_MODE = os.environ['GIT_AUTH_MODE']
GIT_ENV = os.environ.copy()
if GIT_AUTH_MODE == 'http':
auth_option = []
elif GIT_AUTH_MODE == 'ssh':
GIT_ENV["GIT_SSH_COMMAND"] = "ssh -i /ssh/ssh-key -o StrictHostKeyChecking=accept-new"
auth_option=[]
else:
raise ValueError("Invalid GIT_AUTH_MODE")
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('webhook.log'),
logging.StreamHandler()
]
)
logger = logging.getLogger(__name__)
class GitProvider(Enum):
GITEA = "gitea"
GITLAB = "gitlab"
UNKNOWN = "unknown"
GITEA_EVENTS = {
'push': 'Push events',
'create': 'Branch or tag creation',
'delete': 'Branch or tag deletion',
'pull_request': 'Pull request events',
'issues': 'Issues events'
}
class GitOperationError(Exception):
"""Custom exception for git operations"""
pass
def detect_git_provider():
"""
Detect whether the webhook is from Gitea or GitLab based on headers
"""
if request.headers.get('X-Gitea-Event'):
return GitProvider.GITEA
elif request.headers.get('X-Gitlab-Event'):
return GitProvider.GITLAB
return GitProvider.UNKNOWN
def verify_signature(f):
@wraps(f)
def decorated_function(*args, **kwargs):
provider = detect_git_provider()
payload_body = request.get_data()
if provider == GitProvider.GITEA:
signature = request.headers.get('X-Gitea-Signature')
if not signature:
return jsonify({'error': 'No Gitea signature provided'}), 401
expected_signature = hmac.new(
WEBHOOK_SECRET.encode('utf-8'),
payload_body,
hashlib.sha256
).hexdigest()
if not hmac.compare_digest(signature, expected_signature):
return jsonify({'error': 'Invalid Gitea signature'}), 401
elif provider == GitProvider.GITLAB:
token = request.headers.get('X-Gitlab-Token')
if not token:
return jsonify({'error': 'No GitLab token provided'}), 401
if not hmac.compare_digest(token, WEBHOOK_SECRET):
return jsonify({'error': 'Invalid GitLab token'}), 401
else:
return jsonify({'error': 'Unknown Git provider'}), 400
return f(*args, **kwargs)
return decorated_function
def execute_git_pull(repo_path):
"""
Execute git pull in the specified repository path
Returns tuple of (success, message)
"""
try:
# Verify the directory exists and is a git repository
if not os.path.exists(repo_path):
raise GitOperationError(f"Directory does not exist: {repo_path}")
if not os.path.exists(os.path.join(repo_path, '.git')):
raise GitOperationError(f"Not a git repository: {repo_path}")
# Create backup of current state
current_commit = subprocess.check_output(
['git', 'rev-parse', 'HEAD'],
cwd=repo_path,
stderr=subprocess.PIPE
).decode().strip()
# Execute git pull
result = subprocess.run(
['git', 'pull'] + auth_option,
cwd=repo_path,
capture_output=True,
env=GIT_ENV,
text=True,
timeout=300 # 5-minute timeout
)
if result.returncode != 0:
raise GitOperationError(f"Git pull failed: {result.stderr}")
new_commit = subprocess.check_output(
['git', 'rev-parse', 'HEAD'],
cwd=repo_path,
stderr=subprocess.PIPE
).decode().strip()
return True, {
'message': 'Git pull successful',
'output': result.stdout,
'previous_commit': current_commit,
'new_commit': new_commit
}
except subprocess.TimeoutExpired:
raise GitOperationError("Git pull operation timed out")
except subprocess.CalledProcessError as e:
raise GitOperationError(f"Git command failed: {e.stderr.decode() if e.stderr else str(e)}")
except Exception as e:
raise GitOperationError(f"Unexpected error: {str(e)}")
def require_basic_auth(f):
@wraps(f)
def decorated_function(*args, **kwargs):
auth_header = request.headers.get('Authorization')
if not auth_header:
return authenticate()
try:
auth_type, auth_string = auth_header.split(' ', 1)
if auth_type.lower() != 'basic':
logger.info("Invalid authentication header '{auth_type}'")
return authenticate()
# Decode base64 credentials
credentials = base64.b64decode(auth_string).decode('utf-8')
username, password = credentials.split(':', 1)
if not (username == API_USERNAME and password == API_PASSWORD):
logger.info(f"Invalid authentication with username='{username}' and password='{password}'")
return authenticate()
except (ValueError, base64.binascii.Error):
logger.info("Invalid authentication encoding")
return authenticate()
return f(*args, **kwargs)
return decorated_function
def authenticate():
"""Sends a 401 response that enables basic auth"""
return jsonify({'error': 'Authentication required'}), 401, {
'WWW-Authenticate': 'Basic realm="Login Required"'
}
def parse_push_event(provider, payload):
"""
Parse push event payload based on the provider
Returns standardized data structure
"""
try:
if provider == GitProvider.GITEA:
return {
'repo_name': payload['repository']['full_name'].split('/')[-1],
'repo_full_name': payload['repository']['full_name'],
'branch': payload['ref'].split('/')[-1],
'commits': payload['commits'],
'default_branch': payload['repository']['default_branch']
}
elif provider == GitProvider.GITLAB:
return {
'repo_name': payload['project']['web_url'].split('/')[-1],
'repo_full_name': payload['project']['name'],
'branch': payload['ref'].split('/')[-1],
'commits': payload['commits'],
'default_branch': payload['project']['default_branch']
}
else:
raise ValueError("Unknown git provider")
except KeyError as e:
raise KeyError(f"Missing required field in payload: {str(e)}")
@app.route('/webhook', methods=['POST'])
@require_basic_auth
@verify_signature
def webhook():
if not request.is_json:
return jsonify({'error': 'Content type must be application/json'}), 400
provider = detect_git_provider()
payload = request.get_json()
# Determine event type based on provider
if provider == GitProvider.GITEA:
event_type = request.headers.get('X-Gitea-Event')
elif provider == GitProvider.GITLAB:
event_type = request.headers.get('X-Gitlab-Event')
else:
return jsonify({'error': 'Unknown git provider'}), 400
# Normalize event types between Gitea and GitLab
if provider == GitProvider.GITLAB and event_type == 'Push Hook':
event_type = 'push'
if event_type.lower() == 'push':
return handle_push(provider, payload)
else:
logger.info(f"Received unhandled event type: {event_type} from {provider.value}")
return jsonify({'status': 'received', 'event': event_type, 'provider': provider.value}), 200
def handle_push(provider, payload):
"""Handle push events"""
try:
# Parse the payload according to the provider's format
data = parse_push_event(provider, payload)
logger.info(f"Push to {data['repo_full_name']} on branch {data['branch']} via {provider.value}")
logger.info(f"Number of commits: {len(data['commits'])}")
# Only process pushes to the default branch
if data['branch'] != data['default_branch']:
logger.info(f"Skipping pull for non-default branch: {data['branch']}")
return jsonify({
'status': 'skipped',
'event': 'push',
'provider': provider.value,
'reason': f"Push was to non-default branch {data['branch']}"
}), 200
# Construct the repository path
repo_path = DATA_DIR #os.path.join(DATA_DIR, data['repo_name'])
try:
success, result = execute_git_pull(repo_path)
logger.info(f"Git pull result for {data['repo_name']}: {result['message']}")
response_data = {
'status': 'success',
'event': 'push',
'provider': provider.value,
'repo': data['repo_full_name'],
'branch': data['branch'],
'commits_count': len(data['commits']),
'git_pull': result
}
except GitOperationError as e:
logger.error(f"Git pull failed for {data['repo_name']}: {str(e)}")
response_data = {
'status': 'error',
'event': 'push',
'provider': provider.value,
'repo': data['repo_full_name'],
'branch': data['branch'],
'commits_count': len(data['commits']),
'error': str(e)
}
return jsonify(response_data), 200
except KeyError as e:
logger.error(f"Invalid payload structure: {str(e)}")
return jsonify({'error': f'Invalid payload structure: {str(e)}'}), 400
#def handle_pull_request(payload):
# """Handle pull request events"""
# try:
# action = payload['action']
# pr_number = payload['number']
# repo = payload['repository']['full_name']
#
# logger.info(f"Pull request #{pr_number} {action} in {repo}")
#
# return jsonify({
# 'status': 'success',
# 'event': 'pull_request',
# 'action': action,
# 'pr_number': pr_number,
# 'repo': repo
# }), 200
# except KeyError as e:
# logger.error(f"Invalid payload structure: {str(e)}")
# return jsonify({'error': f'Invalid payload structure: {str(e)}'}), 400
#
#def handle_issues(payload):
# """Handle issues events"""
# try:
# action = payload['action']
# issue_number = payload['issue']['number']
# repo = payload['repository']['full_name']
#
# logger.info(f"Issue #{issue_number} {action} in {repo}")
#
# return jsonify({
# 'status': 'success',
# 'event': 'issues',
# 'action': action,
# 'issue_number': issue_number,
# 'repo': repo
# }), 200
# except KeyError as e:
# logger.error(f"Invalid payload structure: {str(e)}")
# return jsonify({'error': f'Invalid payload structure: {str(e)}'}), 400
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)