wcpan.drive.core.types¶
Drive Interface¶
- class wcpan.drive.core.types.Drive¶
Bases:
objectHigh-level interface for cloud drive operations.
Drive is the main user-facing API for interacting with cloud storage. It combines FileService (cloud API) and SnapshotService (local cache) to provide efficient, feature-rich operations.
- Key Features:
Path-based and ID-based node access
Directory traversal and tree walking
File upload/download with resumable support
Node manipulation (create, move, rename, delete)
Change synchronization
Trash management
- Usage Pattern:
Create Drive instances using create_drive() as an async context manager. The Drive interface ensures proper initialization and cleanup.
- Authentication:
Most operations require authentication. Use is_authenticated() to check and authenticate() to establish credentials.
- Thread Safety:
Drive operations are async-safe but instances should not be shared across concurrent tasks without external synchronization.
Example
Basic usage pattern:
>>> async with create_drive( ... file=create_file_service, ... snapshot=create_snapshot_service, ... ) as drive: ... # Check authentication ... if not await drive.is_authenticated(): ... await drive.authenticate() ... ... # Sync to get latest changes ... async for change in drive.sync(): ... print(f"Change: {change}") ... ... # Navigate filesystem ... root = await drive.get_root() ... children = await drive.get_children(root) ... ... # Upload a file ... async with drive.upload_file("doc.txt", root, size=1024) as f: ... await f.write(b"Hello World") ... await f.flush() ... node = await f.node()
- abstractmethod async get_root()¶
Get the root directory node.
Returns the root node of the drive’s directory structure.
- Returns:
Node representing the root directory.
- Raises:
NodeNotFoundError – If not yet synced (cache empty).
- Return type:
Example
Getting root and listing contents:
>>> root = await drive.get_root() >>> print(f"Root ID: {root.id}") >>> children = await drive.get_children(root)
- abstractmethod async get_node_by_id(node_id)¶
Get a node by its unique identifier.
Fast lookup using the node’s ID, retrieved from cache.
- Parameters:
node_id (str) – The node’s unique ID.
- Returns:
The Node with the given ID.
- Raises:
NodeNotFoundError – If node not found in cache.
- Return type:
Example
Getting node by ID:
>>> node = await drive.get_node_by_id("abc123") >>> print(f"Found: {node.name}")
- abstractmethod async get_node_by_path(path)¶
Get a node by its absolute path.
Resolves the path through the cached directory structure.
- Parameters:
path (PurePath) – Absolute path to the node (must start with “/”).
- Returns:
The Node at the given path.
- Raises:
NodeNotFoundError – If path does not exist.
ValueError – If path is not absolute.
- Return type:
Example
Getting node by path:
>>> from pathlib import PurePath >>> node = await drive.get_node_by_path(PurePath("/docs/file.txt")) >>> print(f"Size: {node.size} bytes")
- abstractmethod async resolve_path(node)¶
Resolve the absolute path of a node.
Constructs the full path by walking up the parent chain.
- Parameters:
node (Node) – The Node to resolve.
- Returns:
Absolute PurePath to the node.
- Return type:
Example
Resolving node path:
>>> node = await drive.get_node_by_id("abc123") >>> path = await drive.resolve_path(node) >>> print(f"Path: {path}")
- abstractmethod async get_child_by_name(name, parent)¶
Find a child node by name under a parent.
Searches for a direct child with the given name.
- Parameters:
- Returns:
The child Node with matching name.
- Raises:
NodeNotFoundError – If no child with that name exists.
- Return type:
Example
Finding child by name:
>>> root = await drive.get_root() >>> docs = await drive.get_child_by_name("Documents", root)
- abstractmethod async get_children(parent)¶
Get all direct children of a directory.
Returns both files and subdirectories under the parent.
- Parameters:
parent (Node) – The parent directory Node.
- Returns:
List of child Nodes (may be empty).
- Return type:
Example
Listing directory contents:
>>> root = await drive.get_root() >>> children = await drive.get_children(root) >>> for child in children: ... type_ = "DIR" if child.is_directory else "FILE" ... print(f"{type_}: {child.name}")
- abstractmethod async get_trashed_nodes(flatten=False)¶
Get all nodes in the trash.
- Parameters:
flatten (bool) – If False (default), exclude nodes whose parent is also trashed. If True, return all trashed nodes.
- Returns:
List of trashed Nodes (may be empty).
- Return type:
Example
Listing trash contents:
>>> trashed = await drive.get_trashed_nodes() >>> for node in trashed: ... print(f"Trashed: {node.name}")
Getting all trashed nodes including nested:
>>> all_trashed = await drive.get_trashed_nodes(flatten=True)
- abstractmethod async find_nodes_by_regex(pattern)¶
Find nodes whose names match a regex pattern.
Searches the entire drive for nodes with names matching the given regular expression.
- Parameters:
pattern (str) – Regular expression pattern to match.
- Returns:
List of matching Nodes (may be empty).
- Return type:
Example
Finding all Python files:
>>> nodes = await drive.find_nodes_by_regex(r".*\.py$") >>> for node in nodes: ... print(f"Python file: {node.name}")
- abstractmethod walk(node, *, include_trashed=False)¶
Recursively traverse directory tree from a starting node.
Similar to os.walk(), yields directory information in depth-first order.
- Parameters:
- Yields:
Tuples of (directory, subdirectories, files) where – - directory: Current directory Node being visited - subdirectories: List of direct subdirectory Nodes - files: List of direct file Nodes
- Return type:
Example
Walking entire drive:
>>> root = await drive.get_root() >>> async for directory, subdirs, files in drive.walk(root): ... print(f"Directory: {directory.name}") ... for file in files: ... print(f" File: {file.name} ({file.size} bytes)")
Finding all images:
>>> root = await drive.get_root() >>> async for directory, _, files in drive.walk(root): ... for file in files: ... if file.is_image: ... path = await drive.resolve_path(file) ... print(f"Image: {path}")
- abstractmethod async create_directory(name, parent, *, exist_ok=False)¶
Create a new directory.
Creates a directory with the given name under the parent.
- Parameters:
- Returns:
Node representing the created (or existing if exist_ok) directory.
- Raises:
AuthenticationError – If not authenticated.
NodeExistsError – If exist_ok=False and directory already exists.
ValueError – If name contains invalid characters (/ or ).
- Return type:
Example
Creating a directory:
>>> root = await drive.get_root() >>> docs = await drive.create_directory("Documents", root) >>> print(f"Created: {docs.name} ({docs.id})")
Creating with exist_ok:
>>> docs = await drive.create_directory( ... "Documents", ... root, ... exist_ok=True, ... )
- abstractmethod download_file(node)¶
Download a file from the cloud drive.
Returns an async context manager that yields a ReadableFile for streaming the file content.
- Parameters:
node (Node) – The file Node to download.
- Returns:
Async context manager yielding ReadableFile.
- Raises:
AuthenticationError – If not authenticated.
ValueError – If node is a directory.
- Return type:
Example
Downloading a file:
>>> node = await drive.get_node_by_path(PurePath("/file.txt")) >>> async with drive.download_file(node) as file: ... content = await file.read(1024) ... while content: ... process(content) ... content = await file.read(1024)
Using higher-level helper:
>>> from wcpan.drive.core.lib import download_file_to_local >>> from pathlib import Path >>> local_path = await download_file_to_local( ... drive, ... node, ... Path("/downloads"), ... )
- abstractmethod upload_file(name, parent, *, size=None, mime_type=None, media_info=None)¶
Upload a file to the cloud drive.
Returns an async context manager that yields a WritableFile for streaming content to the cloud.
- Parameters:
name (str) – Name for the uploaded file.
parent (Node) – Parent directory Node where file will be uploaded.
size (int | None) – File size in bytes, or None if unknown.
mime_type (str | None) – MIME type string, or None for default.
media_info (MediaInfo | None) – Optional MediaInfo for images/videos.
- Returns:
Async context manager yielding WritableFile.
- Raises:
AuthenticationError – If not authenticated.
NodeExistsError – If file with same name already exists.
ValueError – If name contains invalid characters (/ or ).
- Return type:
Example
Uploading a file:
>>> root = await drive.get_root() >>> async with drive.upload_file("doc.txt", root, size=11) as file: ... await file.write(b"Hello World") ... await file.flush() ... node = await file.node() ... print(f"Uploaded: {node.id}")
Using higher-level helper:
>>> from wcpan.drive.core.lib import upload_file_from_local >>> from pathlib import Path >>> node = await upload_file_from_local( ... drive, ... Path("/local/file.jpg"), ... parent, ... mime_type="image/jpeg", ... )
- abstractmethod async purge_trash()¶
Permanently delete all items in the trash.
Empties the trash bin, permanently removing all trashed nodes. This operation cannot be undone.
- Raises:
AuthenticationError – If not authenticated.
- Return type:
None
Example
Purging trash:
>>> await drive.purge_trash() >>> trashed = await drive.get_trashed_nodes() >>> assert len(trashed) == 0
- abstractmethod async move(node, *, new_parent=None, new_name=None)¶
Relocate or rename a node.
At least one parameter must be non-None.
- Parameters:
- Returns:
New Node instance with updated properties.
- Raises:
AuthenticationError – If not authenticated.
ValueError – If all parameters are None, or if trying to move root node, or if new_name contains invalid characters.
- Return type:
- abstractmethod async delete(node, *, permanent=False)¶
Delete a node.
Soft-deletes by default (best-effort per backend). Pass permanent=True for a hard delete that cannot be undone.
- Parameters:
- Raises:
AuthenticationError – If not authenticated.
- Return type:
None
- abstractmethod async restore(node)¶
Restore a trashed node.
- Parameters:
node (Node) – The trashed Node to restore.
- Returns:
Updated Node instance reflecting the restored state.
- Raises:
NotImplementedError – If backend doesn’t support restore.
- Return type:
- abstractmethod sync()¶
Synchronize the local cache with cloud storage.
Fetches changes from the cloud and updates the local snapshot. This is the ONLY operation that modifies the cached snapshot.
Should be called periodically or before operations that require up-to-date metadata.
- Yields:
ChangeAction instances for each change detected.
- Raises:
AuthenticationError – If not authenticated.
- Return type:
Example
Basic sync:
>>> async for change in drive.sync(): ... if change[0]: # RemoveAction ... print(f"Removed: {change[1]}") ... else: # UpdateAction ... print(f"Updated: {change[1].name}")
Sync with type guards:
>>> from wcpan.drive.core.lib import is_remove, is_update >>> async for change in drive.sync(): ... if is_remove(change): ... removed, node_id = change ... handle_removal(node_id) ... elif is_update(change): ... updated, node = change ... handle_update(node)
- abstractmethod async get_hasher_factory(node)¶
Get a factory for creating hash calculators for a given node.
Returns a factory function compatible with the hash algorithm used by the backend that owns the node.
- Parameters:
node (Node) – The node whose backend’s hasher factory is returned.
- Returns:
Factory function for creating Hasher instances.
- Return type:
CreateHasher
Example
Computing file hash:
>>> hasher_factory = await drive.get_hasher_factory(node) >>> hasher = await hasher_factory() >>> with open("file.txt", "rb") as f: ... while chunk := f.read(8192): ... await hasher.update(chunk) >>> file_hash = await hasher.hexdigest()
- abstractmethod async is_authenticated()¶
Check if the drive is authenticated and ready for operations.
Returns True if authenticated, False otherwise. Does not perform I/O - checks cached authentication state.
- Returns:
True if authenticated, False otherwise.
- Return type:
Example
Checking before operations:
>>> if await drive.is_authenticated(): ... await drive.sync() ... else: ... print("Not authenticated")
- abstractmethod async authenticate()¶
Authenticate the drive for use.
Establishes authentication credentials. The specific mechanism depends on the underlying FileService implementation.
This method is NOT interactive. OAuth implementations should handle token exchange separately.
- Raises:
AuthenticationError – If authentication fails.
- Return type:
None
Example
Authenticating before use:
>>> async with create_drive(...) as drive: ... await drive.authenticate() ... await drive.sync()
Conditional authentication:
>>> if not await drive.is_authenticated(): ... await drive.authenticate() >>> await drive.upload_file(...)
Service Interfaces¶
- class wcpan.drive.core.types.FileService¶
Bases:
ServiceBackend implementation interface for cloud storage operations.
FileService defines the contract for interacting with cloud storage APIs. Implementations provide the actual I/O operations for a specific cloud provider (e.g., Google Drive, Dropbox, OneDrive).
- Implementations must:
Provide all abstract methods
Return api_version = 5
Handle authentication appropriately
Support async context manager protocol
Raise appropriate exceptions on errors
- Used With:
Paired with a SnapshotService and passed to create_drive().
- Thread Safety:
Implementations should handle concurrent operations safely.
Example
Basic implementation structure:
>>> class MyCloudService(FileService): ... @property ... def api_version(self) -> int: ... return 5 ... ... async def get_root(self) -> Node: ... # Fetch root from API ... pass ... ... # ... implement other abstract methods ...
Using with create_drive:
>>> @asynccontextmanager >>> async def create_my_service(): ... service = MyCloudService(credentials) ... try: ... yield service ... finally: ... await service.close() >>> >>> async with create_drive( ... file=create_my_service, ... snapshot=create_snapshot, ... ) as drive: ... await drive.sync()
- abstractmethod async get_initial_cursor()¶
Get the initial checkpoint cursor for change tracking.
Returns a cursor representing the current state of the cloud drive before any changes are tracked. Used as the starting point for the first sync operation.
- Returns:
Initial cursor string. Format is implementation-specific.
- Return type:
Example
Implementation pattern:
>>> async def get_initial_cursor(self) -> str: ... # For some APIs, this might be a special token ... return "start_token" ... # For others, might be current timestamp ... return str(int(time.time()))
- abstractmethod async get_root()¶
Fetch the root directory node from the cloud storage.
Retrieves the root directory’s metadata from the cloud API and returns it as a Node instance. This is typically cached by the SnapshotService after the first call.
- Returns:
Node representing the root directory.
- Raises:
AuthenticationError – If not authenticated.
- Return type:
Example
Implementation pattern:
>>> async def get_root(self) -> Node: ... response = await self.api_client.get("/root") ... return self._api_response_to_node(response)
- abstractmethod get_changes(cursor)¶
Fetch incremental changes from the cloud storage.
Returns an async iterator that yields pages of changes since the given cursor. Each iteration provides a list of changes and the cursor for the next page.
This method implements polling or pagination to retrieve all changes since the given checkpoint.
- Parameters:
cursor (str) – Starting cursor from previous sync or get_initial_cursor().
- Yields:
Tuples of (changes, next_cursor) where – - changes: List of ChangeAction (additions/updates/removals) - next_cursor: Cursor for the next page or next sync
- Return type:
Example
Implementation pattern:
>>> async def get_changes( ... self, ... cursor: str, ... ) -> AsyncIterator[tuple[list[ChangeAction], str]]: ... while True: ... response = await self.api_client.get_changes(cursor) ... changes = [self._convert_change(c) for c in response.changes] ... yield changes, response.next_cursor ... if not response.has_more: ... break ... cursor = response.next_cursor
Usage in sync:
>>> async for changes, next_cursor in service.get_changes(cursor): ... await snapshot.apply_changes(changes, next_cursor)
- abstractmethod async move(node, *, new_parent, new_name)¶
Relocate or rename a node.
Performs one or more operations on the node: - Rename: Set new_name to change the node’s name - Move: Set new_parent to move to a different directory
At least one of the optional parameters must be non-None.
- Parameters:
- Returns:
New Node instance reflecting the changes.
- Raises:
AuthenticationError – If not authenticated.
ValueError – If all optional parameters are None.
- Return type:
- abstractmethod async delete(node, *, permanent=False)¶
Delete a node.
Soft-deletes by default (best-effort per backend). Pass permanent=True for a hard delete that cannot be undone.
- Parameters:
- Raises:
AuthenticationError – If not authenticated.
- Return type:
None
- abstractmethod async restore(node)¶
Restore a trashed node.
- Parameters:
node (Node) – The trashed Node to restore.
- Returns:
Updated Node instance reflecting the restored state.
- Raises:
NotImplementedError – If backend doesn’t support restore.
- Return type:
- abstractmethod async purge_trash()¶
Permanently delete all trashed items.
Empties the trash bin, permanently removing all trashed nodes. This operation cannot be undone.
- Raises:
AuthenticationError – If not authenticated.
- Return type:
None
Example
Implementation pattern:
>>> async def purge_trash(self) -> None: ... await self.api_client.empty_trash()
- abstractmethod async create_directory(name, parent, *, exist_ok, private)¶
Create a new directory.
Creates a directory with the given name under the specified parent. Behavior when the directory exists depends on exist_ok.
- Parameters:
name (str) – Name for the new directory.
parent (Node) – Parent directory Node.
exist_ok (bool) – If True, don’t raise error if directory exists. If False, raise NodeExistsError if directory exists.
private (PrivateDict | None) – Optional service-specific metadata.
- Returns:
Node representing the created (or existing) directory.
- Raises:
AuthenticationError – If not authenticated.
NodeExistsError – If exist_ok=False and directory exists.
- Return type:
Example
Implementation pattern:
>>> async def create_directory( ... self, ... name: str, ... parent: Node, ... *, ... exist_ok: bool, ... private: PrivateDict | None, ... ) -> Node: ... if not exist_ok: ... existing = await self._find_child(parent.id, name) ... if existing: ... raise NodeExistsError(existing) ... response = await self.api_client.create_folder( ... name=name, ... parent_id=parent.id, ... ) ... return self._api_response_to_node(response)
- abstractmethod download_file(node)¶
Create a readable stream for downloading a file.
Returns an async context manager that yields a ReadableFile for streaming the file content from cloud storage.
- Parameters:
node (Node) – The file Node to download.
- Returns:
Async context manager yielding ReadableFile.
- Raises:
AuthenticationError – If not authenticated.
ValueError – If node is a directory.
- Return type:
Example
Implementation pattern:
>>> @asynccontextmanager >>> async def download_file( ... self, ... node: Node, ... ) -> AsyncIterator[ReadableFile]: ... stream = await self.api_client.download(node.id) ... try: ... yield MyReadableFile(stream, node) ... finally: ... await stream.close()
- abstractmethod upload_file(name, parent, *, size, mime_type, media_info, private)¶
Create a writable stream for uploading a file.
Returns an async context manager that yields a WritableFile for streaming content to cloud storage.
- Parameters:
name (str) – Name for the uploaded file.
parent (Node) – Parent directory Node.
size (int | None) – File size in bytes, or None if unknown.
mime_type (str | None) – MIME type string, or None for default.
media_info (MediaInfo | None) – Optional media metadata for images/videos.
private (PrivateDict | None) – Optional service-specific metadata.
- Returns:
Async context manager yielding WritableFile.
- Raises:
AuthenticationError – If not authenticated.
NodeExistsError – If file with same name already exists.
- Return type:
Example
Implementation pattern:
>>> @asynccontextmanager >>> async def upload_file( ... self, ... name: str, ... parent: Node, ... *, ... size: int | None, ... mime_type: str | None, ... media_info: MediaInfo | None, ... private: PrivateDict | None, ... ) -> AsyncIterator[WritableFile]: ... stream = await self.api_client.upload_start( ... name=name, ... parent_id=parent.id, ... size=size, ... ) ... try: ... yield MyWritableFile(stream) ... finally: ... await stream.finalize()
- abstractmethod async get_hasher_factory()¶
Get a factory for creating hash calculators.
Returns a factory function that creates Hasher instances compatible with this service’s hash algorithm (e.g., MD5, SHA-1, SHA-256).
- Returns:
Factory function for creating Hasher instances.
- Return type:
CreateHasher
Example
Implementation pattern:
>>> async def get_hasher_factory(self) -> CreateHasher: ... async def create_hasher() -> Hasher: ... return MD5Hasher() ... return create_hasher
- abstractmethod async is_authenticated()¶
Check if the service is authenticated and ready for operations.
Checks cached authentication state without performing I/O if possible. Should verify that credentials/tokens are present and not obviously expired.
- Returns:
True if authenticated and ready, False otherwise.
- Return type:
Example
Implementation pattern:
>>> async def is_authenticated(self) -> bool: ... return self.access_token is not None
- abstractmethod async authenticate()¶
Authenticate the service for use.
Establishes authentication using implementation-specific mechanisms (OAuth tokens, API keys, etc.). Implementations must manage their own credential storage and token refresh.
This method is NOT interactive. OAuth implementations should handle token exchange separately before calling this method.
- Raises:
AuthenticationError – If authentication fails.
- Return type:
None
Example
Implementation pattern:
>>> async def authenticate(self) -> None: ... if not self.credentials: ... raise AuthenticationError("No credentials configured") ... response = await self.api_client.validate_token( ... self.credentials.access_token ... ) ... if not response.valid: ... raise AuthenticationError("Invalid credentials")
- class wcpan.drive.core.types.SnapshotService¶
Bases:
ServiceLocal cache interface for node metadata and change tracking.
SnapshotService defines the contract for maintaining a local cache of the cloud drive’s directory structure and file metadata. This enables: - Fast path lookups without API calls - Efficient change synchronization - Offline access to metadata
- Implementations typically use:
SQLite database for persistence
In-memory structures for performance
Transaction support for atomic updates
- Used With:
Paired with a FileService and passed to create_drive().
- Thread Safety:
Implementations should handle concurrent access safely.
Example
Basic implementation structure:
>>> class SQLiteSnapshot(SnapshotService): ... @property ... def api_version(self) -> int: ... return 5 ... ... async def get_root(self) -> Node: ... # Query from database ... pass ... ... async def apply_changes( ... self, ... changes: list[ChangeAction], ... cursor: str, ... ) -> None: ... # Update database in transaction ... pass ... ... # ... implement other abstract methods ...
- abstractmethod async get_root()¶
Get the root directory node from the cache.
Retrieves the cached root node. This should be fast and not involve I/O to the cloud service.
- Returns:
Cached Node representing the root directory.
- Raises:
NodeNotFoundError – If root not yet cached (need to sync first).
- Return type:
Example
Implementation pattern:
>>> async def get_root(self) -> Node: ... row = await self.db.fetch_one("SELECT * FROM nodes WHERE parent_id IS NULL") ... if not row: ... raise NodeNotFoundError("root") ... return self._row_to_node(row)
- abstractmethod async set_root(node)¶
Store the root directory node in the cache.
Called during the first sync to initialize the cache with the root node metadata.
- Parameters:
node (Node) – The root Node to cache.
- Return type:
None
Example
Implementation pattern:
>>> async def set_root(self, node: Node) -> None: ... await self.db.execute( ... "INSERT OR REPLACE INTO nodes VALUES (...)", ... self._node_to_values(node), ... )
- abstractmethod async get_current_cursor()¶
Get the current synchronization cursor.
Returns the cursor from the most recent sync operation. If no sync has occurred yet (first run), returns an empty string.
- Returns:
Current cursor string, or empty string if never synced.
- Return type:
Example
Implementation pattern:
>>> async def get_current_cursor(self) -> str: ... row = await self.db.fetch_one("SELECT cursor FROM metadata") ... return row["cursor"] if row else ""
- abstractmethod async get_node_by_id(node_id)¶
Retrieve a cached node by its ID.
Fast lookup of node metadata from the local cache using the node’s unique identifier.
- Parameters:
node_id (str) – The node’s unique ID.
- Returns:
The cached Node.
- Raises:
NodeNotFoundError – If node not found in cache.
- Return type:
Example
Implementation pattern:
>>> async def get_node_by_id(self, node_id: str) -> Node: ... row = await self.db.fetch_one( ... "SELECT * FROM nodes WHERE id = ?", ... node_id, ... ) ... if not row: ... raise NodeNotFoundError(node_id) ... return self._row_to_node(row)
- abstractmethod async get_node_by_path(path)¶
Resolve a node by its absolute path.
Traverses the cached directory structure to find the node at the given path.
- Parameters:
path (PurePath) – Absolute path to the node.
- Returns:
The Node at the given path.
- Raises:
NodeNotFoundError – If path does not exist in cache.
- Return type:
Example
Implementation pattern:
>>> async def get_node_by_path(self, path: PurePath) -> Node: ... node = await self.get_root() ... for part in path.parts[1:]: # Skip root "/" ... node = await self.get_child_by_name(part, node.id) ... return node
- abstractmethod async resolve_path_by_id(node_id)¶
Resolve the absolute path for a node ID.
Walks up the parent chain to construct the full path for the given node.
- Parameters:
node_id (str) – The node’s unique ID.
- Returns:
Absolute PurePath to the node.
- Raises:
NodeNotFoundError – If node not found in cache.
- Return type:
Example
Implementation pattern:
>>> async def resolve_path_by_id(self, node_id: str) -> PurePath: ... parts = [] ... node = await self.get_node_by_id(node_id) ... while node.parent_id: ... parts.insert(0, node.name) ... node = await self.get_node_by_id(node.parent_id) ... return PurePath("/", *parts)
- abstractmethod async get_child_by_name(name, parent_id)¶
Find a child node by name under a parent.
Searches for a direct child with the given name under the specified parent node.
- Parameters:
- Returns:
The child Node.
- Raises:
NodeNotFoundError – If child not found.
- Return type:
Example
Implementation pattern:
>>> async def get_child_by_name(self, name: str, parent_id: str) -> Node: ... row = await self.db.fetch_one( ... "SELECT * FROM nodes WHERE parent_id = ? AND name = ?", ... parent_id, ... name, ... ) ... if not row: ... raise NodeNotFoundError(name) ... return self._row_to_node(row)
- abstractmethod async get_children_by_id(parent_id)¶
Get all direct children of a parent node.
Retrieves the list of immediate children (both files and directories) under the specified parent.
- Parameters:
parent_id (str) – The parent node’s ID.
- Returns:
List of child Nodes (may be empty).
- Return type:
Example
Implementation pattern:
>>> async def get_children_by_id(self, parent_id: str) -> list[Node]: ... rows = await self.db.fetch_all( ... "SELECT * FROM nodes WHERE parent_id = ? ORDER BY name", ... parent_id, ... ) ... return [self._row_to_node(row) for row in rows]
- abstractmethod async get_trashed_nodes()¶
Get all nodes currently in the trash.
Retrieves all nodes marked as trashed from the cache.
Example
Implementation pattern:
>>> async def get_trashed_nodes(self) -> list[Node]: ... rows = await self.db.fetch_all( ... "SELECT * FROM nodes WHERE is_trashed = 1" ... ) ... return [self._row_to_node(row) for row in rows]
- abstractmethod async apply_changes(changes, cursor)¶
Apply change actions to the cache and update cursor.
Processes a list of changes (additions, updates, removals) and updates the cache accordingly. Should be atomic - either all changes apply successfully or none do.
- Parameters:
changes (list[ChangeAction]) – List of ChangeAction to apply.
cursor (str) – New cursor value to store after applying changes.
- Return type:
None
Example
Implementation pattern:
>>> async def apply_changes( ... self, ... changes: list[ChangeAction], ... cursor: str, ... ) -> None: ... async with self.db.transaction(): ... for change in changes: ... if change[0]: # RemoveAction ... await self.db.execute( ... "DELETE FROM nodes WHERE id = ?", ... change[1], ... ) ... else: # UpdateAction ... node = change[1] ... await self.db.execute( ... "INSERT OR REPLACE INTO nodes VALUES (...)", ... self._node_to_values(node), ... ) ... await self.db.execute( ... "UPDATE metadata SET cursor = ?", ... cursor, ... )
- abstractmethod async find_nodes_by_regex(pattern)¶
Find nodes whose names match a regex pattern.
Searches the cache for nodes with names matching the given regular expression pattern.
- Parameters:
pattern (str) – Regular expression pattern to match against node names.
- Returns:
List of matching Nodes (may be empty).
- Return type:
Example
Implementation pattern:
>>> async def find_nodes_by_regex(self, pattern: str) -> list[Node]: ... import re ... regex = re.compile(pattern) ... rows = await self.db.fetch_all("SELECT * FROM nodes") ... nodes = [self._row_to_node(row) for row in rows] ... return [n for n in nodes if regex.search(n.name)]
- class wcpan.drive.core.types.ReadableFile¶
Bases:
AsyncIterable[bytes]Async readable file interface for downloading cloud files.
Provides sequential and seekable read access to cloud file content. Returned by Drive.download_file() and FileService.download_file() as an async context manager.
This interface extends AsyncIterable[bytes], allowing iteration over file chunks.
- Thread Safety:
Not thread-safe. Each ReadableFile instance should be used from a single async task.
Example
Basic download pattern:
>>> async with drive.download_file(node) as file: ... data = await file.read(8192) ... while data: ... process(data) ... data = await file.read(8192)
Using as async iterator:
>>> async with drive.download_file(node) as file: ... async for chunk in file: ... process(chunk)
Seeking and resuming:
>>> async with drive.download_file(node) as file: ... await file.seek(1024) # Skip first 1KB ... data = await file.read(4096)
- abstractmethod async read(length)¶
Read at most length bytes from the file.
Reads up to the specified number of bytes from the current position. Returns fewer bytes if EOF is reached.
- Parameters:
length (int) – Maximum number of bytes to read.
- Returns:
Bytes read from the file. Empty bytes object indicates EOF.
- Return type:
Example
Reading in chunks:
>>> chunk = await file.read(8192) >>> if chunk: ... print(f"Read {len(chunk)} bytes")
- abstractmethod async seek(offset)¶
Seek to the specified position in the file.
Moves the file pointer to the given absolute offset from the beginning of the file. Subsequent reads will start from this position.
- Parameters:
offset (int) – Absolute byte offset from the beginning of the file.
- Returns:
The new absolute position.
- Return type:
Example
Resume download from specific position:
>>> await file.seek(1048576) # Seek to 1MB >>> data = await file.read(8192)
- abstractmethod async node()¶
Get the Node being read.
Returns the Node instance representing the file being downloaded. Useful for accessing metadata during download.
- Returns:
The Node instance for this file.
- Return type:
Example
Accessing node during download:
>>> async with drive.download_file(node) as file: ... current_node = await file.node() ... print(f"Downloading: {current_node.name}")
- class wcpan.drive.core.types.WritableFile¶
Bases:
objectAsync writable file interface for uploading files to cloud storage.
Provides sequential and seekable write access for uploading content. Returned by Drive.upload_file() and FileService.upload_file() as an async context manager.
Supports resumable uploads through seek() and tell() for recovering from interruptions.
- Thread Safety:
Not thread-safe. Each WritableFile instance should be used from a single async task.
Example
Basic upload pattern:
>>> async with drive.upload_file("file.txt", parent, size=1024) as file: ... with open("local.txt", "rb") as f: ... chunk = f.read(8192) ... while chunk: ... await file.write(chunk) ... chunk = f.read(8192) ... await file.flush() ... node = await file.node()
Resumable upload:
>>> async with drive.upload_file("large.bin", parent, size=size) as file: ... try: ... # ... write data ... ... await file.flush() ... except TimeoutError: ... offset = await file.tell() ... await file.seek(offset) ... # Continue from offset
- abstractmethod async tell()¶
Get the current write position.
Returns the absolute byte offset of the current write position. Used for resumable uploads to determine how much data was successfully written before an interruption.
- Returns:
Current byte offset from the beginning of the file.
- Return type:
Example
Checking upload progress:
>>> bytes_written = await file.tell() >>> print(f"Uploaded {bytes_written}/{total_size} bytes")
- abstractmethod async seek(offset)¶
Seek to the specified position for resuming uploads.
Moves the write pointer to the given absolute offset from the beginning. Used to resume uploads after interruptions.
- Parameters:
offset (int) – Absolute byte offset from the beginning.
- Returns:
The new absolute position.
- Return type:
Example
Resuming upload:
>>> last_position = await file.tell() >>> # ... connection lost ... >>> await file.seek(last_position) >>> # Continue writing from last position
- abstractmethod async write(chunk)¶
Write data to the upload stream.
Writes the given bytes to the file at the current position. May buffer data internally - call flush() to ensure data is sent to the server.
- Parameters:
chunk (bytes) – Bytes to write.
- Returns:
Number of bytes written to the buffer.
- Return type:
Example
Writing with progress tracking:
>>> total_written = 0 >>> for chunk in chunks: ... written = await file.write(chunk) ... total_written += written ... print(f"Progress: {total_written} bytes")
- abstractmethod async flush()¶
Flush buffered data to the server.
Ensures all buffered data is sent to the cloud storage backend. Should be called before closing the file or checking upload status.
Example
Flushing before getting node:
>>> await file.write(data) >>> await file.flush() >>> node = await file.node()
- Return type:
None
- abstractmethod async node()¶
Get the uploaded Node.
Returns the Node instance representing the uploaded file. Should be called after flush() to ensure the upload is complete.
- Returns:
The Node instance for the uploaded file.
- Raises:
NodeNotFoundError – If upload failed or is incomplete.
- Return type:
Example
Getting uploaded node:
>>> async with drive.upload_file("doc.pdf", parent, size=size) as file: ... await file.write(data) ... await file.flush() ... uploaded_node = await file.node() ... print(f"Uploaded: {uploaded_node.id}")
- class wcpan.drive.core.types.Hasher¶
Bases:
objectHash calculator for file integrity verification.
Provides streaming hash calculation compatible with the cloud service’s hash algorithm. Implementations must match the hashing method used by the specific cloud storage backend.
Supports incremental updates for efficient hashing of large files and can be copied mid-stream for checkpointing.
Example
Computing file hash:
>>> hasher_factory = await drive.get_hasher_factory() >>> hasher = await hasher_factory() >>> with open("file.txt", "rb") as f: ... while chunk := f.read(8192): ... await hasher.update(chunk) >>> file_hash = await hasher.hexdigest()
Using copy for checkpointing:
>>> hasher1 = await hasher_factory() >>> await hasher1.update(b"data") >>> hasher2 = await hasher1.copy() >>> await hasher1.update(b"more") >>> await hasher2.update(b"different") >>> hash1 = await hasher1.hexdigest() >>> hash2 = await hasher2.hexdigest()
- abstractmethod async update(data)¶
Feed data into the hash calculation.
Updates the hash with the provided data. Can be called multiple times to process data incrementally.
- Parameters:
data (bytes) – Bytes to include in the hash calculation.
- Return type:
None
Example
Incremental hashing:
>>> await hasher.update(b"Hello ") >>> await hasher.update(b"World") >>> digest = await hasher.hexdigest()
- abstractmethod async digest()¶
Get the raw binary hash digest.
Returns the hash as raw bytes. The length depends on the hash algorithm (e.g., 16 bytes for MD5, 32 bytes for SHA-256).
- Returns:
Raw binary hash digest.
- Return type:
Example
Getting binary digest:
>>> raw_hash = await hasher.digest() >>> print(f"Hash length: {len(raw_hash)} bytes")
- abstractmethod async hexdigest()¶
Get the hexadecimal hash digest.
Returns the hash as a lowercase hexadecimal string, which is typically used for comparison with Node.hash values.
- Returns:
Hexadecimal string representation of the hash.
- Return type:
Example
Comparing with node hash:
>>> computed = await hasher.hexdigest() >>> if computed == node.hash: ... print("Hash matches!")
- abstractmethod async copy()¶
Create an independent copy of this hasher.
Returns a new Hasher instance with the same internal state, allowing divergent hash calculations from the same checkpoint.
- Returns:
New Hasher instance with copied state.
- Return type:
Example
Checkpointing hash calculation:
>>> checkpoint = await hasher.copy() >>> await hasher.update(b"path1") >>> await checkpoint.update(b"path2")
Data Classes¶
- class wcpan.drive.core.types.Node(*, id, parent_id, name, is_directory, is_trashed, ctime, mtime, mime_type, hash, size, is_image, is_video, width, height, ms_duration, private)¶
Bases:
objectImmutable representation of a file or directory in the cloud drive.
Node is the core data structure representing both files and directories. All attributes are immutable - to modify a node, use Drive methods which return new Node instances with updated values.
- Parameters:
- ctime¶
Creation timestamp.
- Type:
- mtime¶
Last modification timestamp.
- Type:
- private¶
Service-specific metadata, or None if not used.
- Type:
Note
Node is frozen (immutable). To change properties, use Drive.move() or other Drive methods which return new Node instances.
Example
Accessing node properties:
>>> node = await drive.get_node_by_path(PurePath("/photos/image.jpg")) >>> print(f"{node.name}: {node.size} bytes") >>> if node.is_image: ... print(f"Dimensions: {node.width}x{node.height}")
Checking if node is root:
>>> root = await drive.get_root() >>> is_root = node.parent_id is None
- class wcpan.drive.core.types.SourceConfig(*, name, file, snapshot)¶
Bases:
objectConfiguration for a single storage backend source.
Used with create_multi_drive() to configure one or more storage backends. In single-source mode, paths are unchanged. In multi-source mode (2+ sources), each source’s paths are prefixed with its name (e.g., /google/docs/file.txt).
- Parameters:
name (str)
file (CreateFileService)
snapshot (CreateSnapshotService)
- file¶
Factory function returning a FileService context manager.
- snapshot¶
Factory function returning a SnapshotService context manager.
- class wcpan.drive.core.types.MediaInfo(*, is_image=False, is_video=False, width=0, height=0, ms_duration=0)¶
Bases:
objectMedia metadata for image and video files.
Encapsulates dimension and duration information for media files. Use the factory methods image() or video() to create instances with appropriate metadata.
- Factory Methods:
image(width, height): Create metadata for an image file. video(width, height, ms_duration): Create metadata for a video file.
- Properties:
is_image: True if this represents image metadata. is_video: True if this represents video metadata. width: Width in pixels. height: Height in pixels. ms_duration: Duration in milliseconds (video only).
Example
Creating image metadata:
>>> media = MediaInfo.image(width=1920, height=1080) >>> await upload_file_from_local( ... drive, ... path, ... parent, ... mime_type="image/jpeg", ... media_info=media, ... )
Creating video metadata:
>>> media = MediaInfo.video(width=1920, height=1080, ms_duration=120000) >>> await upload_file_from_local( ... drive, ... path, ... parent, ... mime_type="video/mp4", ... media_info=media, ... )
- classmethod image(width, height)¶
Create MediaInfo for an image file.
- classmethod video(width, height, ms_duration)¶
Create MediaInfo for a video file.
Type Aliases¶
- wcpan.drive.core.types.ChangeAction = ChangeAction¶
Union of change actions returned by drive synchronization.
Represents either a node removal (RemoveAction) or node update (UpdateAction). Use type guard functions is_remove() and is_update() for type-safe handling.
Example
Using type guards:
>>> from wcpan.drive.core.lib import is_remove, is_update >>> async for change in drive.sync(): ... if is_remove(change): ... removed, node_id = change ... print(f"Removed: {node_id}") ... elif is_update(change): ... updated, node = change ... print(f"Updated: {node.name}")
Using dispatch helper:
>>> from wcpan.drive.core.lib import dispatch_change >>> result = dispatch_change( ... change, ... on_remove=lambda id: print(f"Removed {id}"), ... on_update=lambda node: print(f"Updated {node.name}"), ... )
- wcpan.drive.core.types.RemoveAction = RemoveAction¶
Change action representing a removed node.
A tuple of (True, node_id) indicating that the node with the given ID was removed from the drive.
Example
Pattern matching on RemoveAction:
>>> if change[0]: # RemoveAction ... removed, node_id = change ... print(f"Node removed: {node_id}")
- wcpan.drive.core.types.UpdateAction = UpdateAction¶
Change action representing an updated or added node.
A tuple of (False, node) indicating that the node was either newly created or had its metadata updated.
Example
Pattern matching on UpdateAction:
>>> if not change[0]: # UpdateAction ... updated, node = change ... print(f"Node updated: {node.name}")
- wcpan.drive.core.types.PrivateDict = PrivateDict¶
Service-specific metadata dictionary for custom attributes.
Implementations can store arbitrary string key-value pairs to support service-specific features not covered by the standard Node attributes. For example, sharing permissions, labels, or custom properties.
- wcpan.drive.core.types.CreateFileService = CreateFileService¶
Factory type for creating FileService instances.
See CreateService for details.
- wcpan.drive.core.types.CreateFileServiceMiddleware = CreateFileServiceMiddleware¶
Middleware factory type for wrapping FileService instances.
See CreateServiceMiddleware for details.
- wcpan.drive.core.types.CreateSnapshotService = CreateSnapshotService¶
Factory type for creating SnapshotService instances.
See CreateService for details.
- wcpan.drive.core.types.CreateSnapshotServiceMiddleware = CreateSnapshotServiceMiddleware¶
Middleware factory type for wrapping SnapshotService instances.
See CreateServiceMiddleware for details.