# Copyright 2026 The Gitea Authors. All rights reserved. # SPDX-License-Identifier: MIT """Gitea API client with chunked upload support.""" import hashlib import time from concurrent.futures import ThreadPoolExecutor, as_completed from io import BytesIO from typing import BinaryIO, Callable, Dict, Any, List, Optional from urllib.parse import urljoin, quote import requests from .exceptions import APIError, AuthenticationError, NotFoundError, UploadError from .models import ( User, Repository, Release, UploadSession, UploadResult, Progress, ) class GiteaClient: """Client for the Gitea API. Args: base_url: The base URL of the Gitea instance (e.g., "https://gitea.example.com") token: API token for authentication timeout: Request timeout in seconds (default: 30) """ DEFAULT_CHUNK_SIZE = 10 * 1024 * 1024 # 10MB MAX_PARALLEL = 8 def __init__( self, base_url: str, token: Optional[str] = None, timeout: int = 30, ): self.base_url = base_url.rstrip("/") self.token = token self.timeout = timeout self.session = requests.Session() self.session.headers.update({ "User-Agent": "gitea-sdk-python/1.0", }) if token: self.session.headers["Authorization"] = f"token {token}" def _url(self, path: str) -> str: """Build full URL for API path.""" return urljoin(self.base_url, path) def _request( self, method: str, path: str, json: Optional[Dict[str, Any]] = None, data: Optional[bytes] = None, headers: Optional[Dict[str, str]] = None, ) -> Dict[str, Any]: """Make an API request.""" url = self._url(path) req_headers = headers or {} try: if data is not None: resp = self.session.request( method, url, data=data, headers=req_headers, timeout=self.timeout, ) else: resp = self.session.request( method, url, json=json, timeout=self.timeout, ) except requests.RequestException as e: raise APIError(f"Request failed: {e}") if resp.status_code == 401: raise AuthenticationError() elif resp.status_code == 404: raise NotFoundError("Resource") elif resp.status_code >= 400: try: error_data = resp.json() raise APIError( message=error_data.get("message", resp.text), code=error_data.get("code"), status=resp.status_code, details=error_data.get("details"), ) except ValueError: raise APIError(resp.text, status=resp.status_code) if resp.status_code == 204 or not resp.content: return {} try: return resp.json() except ValueError: return {"content": resp.text} # User methods def get_current_user(self) -> User: """Get the currently authenticated user.""" data = self._request("GET", "/api/v1/user") return User.from_dict(data) def get_user(self, username: str) -> User: """Get a user by username.""" data = self._request("GET", f"/api/v1/users/{quote(username)}") return User.from_dict(data) # Repository methods def get_repository(self, owner: str, repo: str) -> Repository: """Get a repository by owner and name.""" data = self._request("GET", f"/api/v1/repos/{quote(owner)}/{quote(repo)}") return Repository.from_dict(data) def list_user_repos(self, username: str) -> List[Repository]: """List repositories for a user.""" data = self._request("GET", f"/api/v1/users/{quote(username)}/repos") return [Repository.from_dict(r) for r in data] # Release methods def get_release(self, owner: str, repo: str, release_id: int) -> Release: """Get a release by ID.""" data = self._request( "GET", f"/api/v1/repos/{quote(owner)}/{quote(repo)}/releases/{release_id}", ) return Release.from_dict(data) def get_release_by_tag(self, owner: str, repo: str, tag: str) -> Release: """Get a release by tag name.""" data = self._request( "GET", f"/api/v1/repos/{quote(owner)}/{quote(repo)}/releases/tags/{quote(tag)}", ) return Release.from_dict(data) def list_releases(self, owner: str, repo: str) -> List[Release]: """List all releases for a repository.""" data = self._request( "GET", f"/api/v1/repos/{quote(owner)}/{quote(repo)}/releases", ) return [Release.from_dict(r) for r in data] # Chunked upload methods def upload_release_asset( self, owner: str, repo: str, release_id: int, file: BinaryIO, filename: str, chunk_size: int = DEFAULT_CHUNK_SIZE, parallel: int = 4, verify_checksum: bool = True, progress_callback: Optional[Callable[[Progress], None]] = None, ) -> UploadResult: """Upload a release asset using chunked upload. Args: owner: Repository owner repo: Repository name release_id: Release ID file: File-like object to upload filename: Name for the uploaded file chunk_size: Size of each chunk in bytes (default: 10MB) parallel: Number of parallel upload workers (default: 4) verify_checksum: Whether to verify file checksum (default: True) progress_callback: Callback function for progress updates Returns: UploadResult with details of the uploaded asset """ # Read file content file.seek(0, 2) # Seek to end file_size = file.tell() file.seek(0) # Seek back to start # Calculate checksum if needed checksum = None if verify_checksum: hasher = hashlib.sha256() while True: chunk = file.read(65536) if not chunk: break hasher.update(chunk) checksum = hasher.hexdigest() file.seek(0) # Create upload session session = self._create_upload_session( owner, repo, release_id, filename, file_size, chunk_size, checksum ) # Upload chunks try: self._upload_chunks( session, file, parallel, progress_callback ) except Exception as e: raise UploadError( f"Upload failed: {e}", session_id=session.id, ) # Complete upload result = self._complete_upload(session.id) return result def _create_upload_session( self, owner: str, repo: str, release_id: int, filename: str, file_size: int, chunk_size: int, checksum: Optional[str] = None, ) -> UploadSession: """Create a chunked upload session.""" body = { "name": filename, "size": file_size, "chunk_size": chunk_size, } if checksum: body["checksum"] = checksum data = self._request( "POST", f"/api/v1/repos/{quote(owner)}/{quote(repo)}/releases/{release_id}/assets/upload-session", json=body, ) return UploadSession.from_dict(data) def _upload_chunks( self, session: UploadSession, file: BinaryIO, parallel: int, progress_callback: Optional[Callable[[Progress], None]] = None, ) -> None: """Upload file chunks in parallel.""" total_chunks = session.total_chunks chunk_size = session.chunk_size file_size = session.file_size bytes_uploaded = 0 start_time = time.time() def upload_chunk(chunk_num: int, chunk_data: bytes) -> int: """Upload a single chunk.""" self._request( "PUT", f"/api/v1/repos/uploads/{session.id}/chunks/{chunk_num}", data=chunk_data, headers={"Content-Type": "application/octet-stream"}, ) return len(chunk_data) # Prepare chunks chunks = [] for chunk_num in range(total_chunks): offset = chunk_num * chunk_size size = min(chunk_size, file_size - offset) file.seek(offset) chunk_data = file.read(size) chunks.append((chunk_num, chunk_data)) # Upload in parallel parallel = min(parallel, self.MAX_PARALLEL) with ThreadPoolExecutor(max_workers=parallel) as executor: futures = { executor.submit(upload_chunk, num, data): num for num, data in chunks } for future in as_completed(futures): chunk_num = futures[future] try: bytes_done = future.result() bytes_uploaded += bytes_done if progress_callback: elapsed = time.time() - start_time speed = bytes_uploaded / elapsed if elapsed > 0 else 0 remaining = file_size - bytes_uploaded eta = remaining / speed if speed > 0 else 0 progress = Progress( bytes_done=bytes_uploaded, bytes_total=file_size, chunks_done=chunk_num + 1, chunks_total=total_chunks, percent=bytes_uploaded / file_size * 100, speed=speed, eta_seconds=eta, ) progress_callback(progress) except Exception as e: raise UploadError( f"Failed to upload chunk {chunk_num}: {e}", session_id=session.id, chunk=chunk_num, ) def _complete_upload(self, session_id: str) -> UploadResult: """Complete a chunked upload.""" data = self._request( "POST", f"/api/v1/repos/uploads/{session_id}/complete", ) return UploadResult.from_dict(data) def cancel_upload(self, session_id: str) -> None: """Cancel an upload session.""" self._request("DELETE", f"/api/v1/repos/uploads/{session_id}") def get_upload_session(self, session_id: str) -> UploadSession: """Get the status of an upload session.""" data = self._request("GET", f"/api/v1/repos/uploads/{session_id}") return UploadSession.from_dict(data) def resume_upload( self, session_id: str, file: BinaryIO, progress_callback: Optional[Callable[[Progress], None]] = None, ) -> UploadResult: """Resume an interrupted upload. Args: session_id: ID of the upload session to resume file: File-like object to upload progress_callback: Callback function for progress updates Returns: UploadResult with details of the uploaded asset """ session = self.get_upload_session(session_id) if session.status == "complete": raise UploadError("Upload already completed", session_id=session_id) if session.status == "expired": raise UploadError("Upload session has expired", session_id=session_id) # Upload remaining chunks self._upload_remaining_chunks(session, file, progress_callback) # Complete upload return self._complete_upload(session_id) def _upload_remaining_chunks( self, session: UploadSession, file: BinaryIO, progress_callback: Optional[Callable[[Progress], None]] = None, ) -> None: """Upload remaining chunks for a resumed upload.""" # Similar to _upload_chunks but starts from chunks_received total_chunks = session.total_chunks chunk_size = session.chunk_size file_size = session.file_size start_chunk = session.chunks_received bytes_uploaded = start_chunk * chunk_size start_time = time.time() for chunk_num in range(start_chunk, total_chunks): offset = chunk_num * chunk_size size = min(chunk_size, file_size - offset) file.seek(offset) chunk_data = file.read(size) self._request( "PUT", f"/api/v1/repos/uploads/{session.id}/chunks/{chunk_num}", data=chunk_data, headers={"Content-Type": "application/octet-stream"}, ) bytes_uploaded += len(chunk_data) if progress_callback: elapsed = time.time() - start_time speed = (bytes_uploaded - start_chunk * chunk_size) / elapsed if elapsed > 0 else 0 remaining = file_size - bytes_uploaded eta = remaining / speed if speed > 0 else 0 progress = Progress( bytes_done=bytes_uploaded, bytes_total=file_size, chunks_done=chunk_num + 1, chunks_total=total_chunks, percent=bytes_uploaded / file_size * 100, speed=speed, eta_seconds=eta, ) progress_callback(progress)