wcpan.drive.core.types

Drive Interface

class wcpan.drive.core.types.Drive

Bases: object

High-level interface for cloud drive operations.

Drive is the main user-facing API for interacting with cloud storage. It combines FileService (cloud API) and SnapshotService (local cache) to provide efficient, feature-rich operations.

Key Features:
  • Path-based and ID-based node access

  • Directory traversal and tree walking

  • File upload/download with resumable support

  • Node manipulation (create, move, rename, delete)

  • Change synchronization

  • Trash management

Usage Pattern:

Create Drive instances using create_drive() as an async context manager. The Drive interface ensures proper initialization and cleanup.

Authentication:

Most operations require authentication. Use is_authenticated() to check and authenticate() to establish credentials.

Thread Safety:

Drive operations are async-safe but instances should not be shared across concurrent tasks without external synchronization.

Example

Basic usage pattern:

>>> async with create_drive(
...     file=create_file_service,
...     snapshot=create_snapshot_service,
... ) as drive:
...     # Check authentication
...     if not await drive.is_authenticated():
...         await drive.authenticate()
...
...     # Sync to get latest changes
...     async for change in drive.sync():
...         print(f"Change: {change}")
...
...     # Navigate filesystem
...     root = await drive.get_root()
...     children = await drive.get_children(root)
...
...     # Upload a file
...     async with drive.upload_file("doc.txt", root, size=1024) as f:
...         await f.write(b"Hello World")
...         await f.flush()
...         node = await f.node()
abstractmethod async get_root()

Get the root directory node.

Returns the root node of the drive’s directory structure.

Returns:

Node representing the root directory.

Raises:

NodeNotFoundError – If not yet synced (cache empty).

Return type:

Node

Example

Getting root and listing contents:

>>> root = await drive.get_root()
>>> print(f"Root ID: {root.id}")
>>> children = await drive.get_children(root)
abstractmethod async get_node_by_id(node_id)

Get a node by its unique identifier.

Fast lookup using the node’s ID, retrieved from cache.

Parameters:

node_id (str) – The node’s unique ID.

Returns:

The Node with the given ID.

Raises:

NodeNotFoundError – If node not found in cache.

Return type:

Node

Example

Getting node by ID:

>>> node = await drive.get_node_by_id("abc123")
>>> print(f"Found: {node.name}")
abstractmethod async get_node_by_path(path)

Get a node by its absolute path.

Resolves the path through the cached directory structure.

Parameters:

path (PurePath) – Absolute path to the node (must start with “/”).

Returns:

The Node at the given path.

Raises:
Return type:

Node

Example

Getting node by path:

>>> from pathlib import PurePath
>>> node = await drive.get_node_by_path(PurePath("/docs/file.txt"))
>>> print(f"Size: {node.size} bytes")
abstractmethod async resolve_path(node)

Resolve the absolute path of a node.

Constructs the full path by walking up the parent chain.

Parameters:

node (Node) – The Node to resolve.

Returns:

Absolute PurePath to the node.

Return type:

PurePath

Example

Resolving node path:

>>> node = await drive.get_node_by_id("abc123")
>>> path = await drive.resolve_path(node)
>>> print(f"Path: {path}")
abstractmethod async get_child_by_name(name, parent)

Find a child node by name under a parent.

Searches for a direct child with the given name.

Parameters:
  • name (str) – The child’s name to find.

  • parent (Node) – The parent Node to search under.

Returns:

The child Node with matching name.

Raises:

NodeNotFoundError – If no child with that name exists.

Return type:

Node

Example

Finding child by name:

>>> root = await drive.get_root()
>>> docs = await drive.get_child_by_name("Documents", root)
abstractmethod async get_children(parent)

Get all direct children of a directory.

Returns both files and subdirectories under the parent.

Parameters:

parent (Node) – The parent directory Node.

Returns:

List of child Nodes (may be empty).

Return type:

list[Node]

Example

Listing directory contents:

>>> root = await drive.get_root()
>>> children = await drive.get_children(root)
>>> for child in children:
...     type_ = "DIR" if child.is_directory else "FILE"
...     print(f"{type_}: {child.name}")
abstractmethod async get_trashed_nodes(flatten=False)

Get all nodes in the trash.

Parameters:

flatten (bool) – If False (default), exclude nodes whose parent is also trashed. If True, return all trashed nodes.

Returns:

List of trashed Nodes (may be empty).

Return type:

list[Node]

Example

Listing trash contents:

>>> trashed = await drive.get_trashed_nodes()
>>> for node in trashed:
...     print(f"Trashed: {node.name}")

Getting all trashed nodes including nested:

>>> all_trashed = await drive.get_trashed_nodes(flatten=True)
abstractmethod async find_nodes_by_regex(pattern)

Find nodes whose names match a regex pattern.

Searches the entire drive for nodes with names matching the given regular expression.

Parameters:

pattern (str) – Regular expression pattern to match.

Returns:

List of matching Nodes (may be empty).

Return type:

list[Node]

Example

Finding all Python files:

>>> nodes = await drive.find_nodes_by_regex(r".*\.py$")
>>> for node in nodes:
...     print(f"Python file: {node.name}")
abstractmethod walk(node, *, include_trashed=False)

Recursively traverse directory tree from a starting node.

Similar to os.walk(), yields directory information in depth-first order.

Parameters:
  • node (Node) – Starting directory Node for traversal.

  • include_trashed (bool) – If True, include trashed nodes in traversal.

Yields:

Tuples of (directory, subdirectories, files) where – - directory: Current directory Node being visited - subdirectories: List of direct subdirectory Nodes - files: List of direct file Nodes

Return type:

AsyncIterator[tuple[Node, list[Node], list[Node]]]

Example

Walking entire drive:

>>> root = await drive.get_root()
>>> async for directory, subdirs, files in drive.walk(root):
...     print(f"Directory: {directory.name}")
...     for file in files:
...         print(f"  File: {file.name} ({file.size} bytes)")

Finding all images:

>>> root = await drive.get_root()
>>> async for directory, _, files in drive.walk(root):
...     for file in files:
...         if file.is_image:
...             path = await drive.resolve_path(file)
...             print(f"Image: {path}")
abstractmethod async create_directory(name, parent, *, exist_ok=False)

Create a new directory.

Creates a directory with the given name under the parent.

Parameters:
  • name (str) – Name for the new directory.

  • parent (Node) – Parent directory Node where directory will be created.

  • exist_ok (bool) – If True, don’t raise error if directory already exists.

Returns:

Node representing the created (or existing if exist_ok) directory.

Raises:
Return type:

Node

Example

Creating a directory:

>>> root = await drive.get_root()
>>> docs = await drive.create_directory("Documents", root)
>>> print(f"Created: {docs.name} ({docs.id})")

Creating with exist_ok:

>>> docs = await drive.create_directory(
...     "Documents",
...     root,
...     exist_ok=True,
... )
abstractmethod download_file(node)

Download a file from the cloud drive.

Returns an async context manager that yields a ReadableFile for streaming the file content.

Parameters:

node (Node) – The file Node to download.

Returns:

Async context manager yielding ReadableFile.

Raises:
Return type:

AbstractAsyncContextManager[ReadableFile]

Example

Downloading a file:

>>> node = await drive.get_node_by_path(PurePath("/file.txt"))
>>> async with drive.download_file(node) as file:
...     content = await file.read(1024)
...     while content:
...         process(content)
...         content = await file.read(1024)

Using higher-level helper:

>>> from wcpan.drive.core.lib import download_file_to_local
>>> from pathlib import Path
>>> local_path = await download_file_to_local(
...     drive,
...     node,
...     Path("/downloads"),
... )
abstractmethod upload_file(name, parent, *, size=None, mime_type=None, media_info=None)

Upload a file to the cloud drive.

Returns an async context manager that yields a WritableFile for streaming content to the cloud.

Parameters:
  • name (str) – Name for the uploaded file.

  • parent (Node) – Parent directory Node where file will be uploaded.

  • size (int | None) – File size in bytes, or None if unknown.

  • mime_type (str | None) – MIME type string, or None for default.

  • media_info (MediaInfo | None) – Optional MediaInfo for images/videos.

Returns:

Async context manager yielding WritableFile.

Raises:
Return type:

AbstractAsyncContextManager[WritableFile]

Example

Uploading a file:

>>> root = await drive.get_root()
>>> async with drive.upload_file("doc.txt", root, size=11) as file:
...     await file.write(b"Hello World")
...     await file.flush()
...     node = await file.node()
...     print(f"Uploaded: {node.id}")

Using higher-level helper:

>>> from wcpan.drive.core.lib import upload_file_from_local
>>> from pathlib import Path
>>> node = await upload_file_from_local(
...     drive,
...     Path("/local/file.jpg"),
...     parent,
...     mime_type="image/jpeg",
... )
abstractmethod async purge_trash()

Permanently delete all items in the trash.

Empties the trash bin, permanently removing all trashed nodes. This operation cannot be undone.

Raises:

AuthenticationError – If not authenticated.

Return type:

None

Example

Purging trash:

>>> await drive.purge_trash()
>>> trashed = await drive.get_trashed_nodes()
>>> assert len(trashed) == 0
abstractmethod async move(node, *, new_parent=None, new_name=None)

Relocate or rename a node.

At least one parameter must be non-None.

Parameters:
  • node (Node) – The Node to modify.

  • new_parent (Node | None) – New parent directory, or None to keep current.

  • new_name (str | None) – New name, or None to keep current.

Returns:

New Node instance with updated properties.

Raises:
  • AuthenticationError – If not authenticated.

  • ValueError – If all parameters are None, or if trying to move root node, or if new_name contains invalid characters.

Return type:

Node

abstractmethod async delete(node, *, permanent=False)

Delete a node.

Soft-deletes by default (best-effort per backend). Pass permanent=True for a hard delete that cannot be undone.

Parameters:
  • node (Node) – The Node to delete.

  • permanent (bool) – If True, permanently delete. If False (default), soft-delete (e.g. move to trash).

Raises:

AuthenticationError – If not authenticated.

Return type:

None

abstractmethod async restore(node)

Restore a trashed node.

Parameters:

node (Node) – The trashed Node to restore.

Returns:

Updated Node instance reflecting the restored state.

Raises:

NotImplementedError – If backend doesn’t support restore.

Return type:

Node

abstractmethod sync()

Synchronize the local cache with cloud storage.

Fetches changes from the cloud and updates the local snapshot. This is the ONLY operation that modifies the cached snapshot.

Should be called periodically or before operations that require up-to-date metadata.

Yields:

ChangeAction instances for each change detected.

Raises:

AuthenticationError – If not authenticated.

Return type:

AsyncIterator[ChangeAction]

Example

Basic sync:

>>> async for change in drive.sync():
...     if change[0]:  # RemoveAction
...         print(f"Removed: {change[1]}")
...     else:  # UpdateAction
...         print(f"Updated: {change[1].name}")

Sync with type guards:

>>> from wcpan.drive.core.lib import is_remove, is_update
>>> async for change in drive.sync():
...     if is_remove(change):
...         removed, node_id = change
...         handle_removal(node_id)
...     elif is_update(change):
...         updated, node = change
...         handle_update(node)
abstractmethod async get_hasher_factory(node)

Get a factory for creating hash calculators for a given node.

Returns a factory function compatible with the hash algorithm used by the backend that owns the node.

Parameters:

node (Node) – The node whose backend’s hasher factory is returned.

Returns:

Factory function for creating Hasher instances.

Return type:

CreateHasher

Example

Computing file hash:

>>> hasher_factory = await drive.get_hasher_factory(node)
>>> hasher = await hasher_factory()
>>> with open("file.txt", "rb") as f:
...     while chunk := f.read(8192):
...         await hasher.update(chunk)
>>> file_hash = await hasher.hexdigest()
abstractmethod async is_authenticated()

Check if the drive is authenticated and ready for operations.

Returns True if authenticated, False otherwise. Does not perform I/O - checks cached authentication state.

Returns:

True if authenticated, False otherwise.

Return type:

bool

Example

Checking before operations:

>>> if await drive.is_authenticated():
...     await drive.sync()
... else:
...     print("Not authenticated")
abstractmethod async authenticate()

Authenticate the drive for use.

Establishes authentication credentials. The specific mechanism depends on the underlying FileService implementation.

This method is NOT interactive. OAuth implementations should handle token exchange separately.

Raises:

AuthenticationError – If authentication fails.

Return type:

None

Example

Authenticating before use:

>>> async with create_drive(...) as drive:
...     await drive.authenticate()
...     await drive.sync()

Conditional authentication:

>>> if not await drive.is_authenticated():
...     await drive.authenticate()
>>> await drive.upload_file(...)

Service Interfaces

class wcpan.drive.core.types.FileService

Bases: Service

Backend implementation interface for cloud storage operations.

FileService defines the contract for interacting with cloud storage APIs. Implementations provide the actual I/O operations for a specific cloud provider (e.g., Google Drive, Dropbox, OneDrive).

Implementations must:
  • Provide all abstract methods

  • Return api_version = 5

  • Handle authentication appropriately

  • Support async context manager protocol

  • Raise appropriate exceptions on errors

Used With:

Paired with a SnapshotService and passed to create_drive().

Thread Safety:

Implementations should handle concurrent operations safely.

Example

Basic implementation structure:

>>> class MyCloudService(FileService):
...     @property
...     def api_version(self) -> int:
...         return 5
...
...     async def get_root(self) -> Node:
...         # Fetch root from API
...         pass
...
...     # ... implement other abstract methods ...

Using with create_drive:

>>> @asynccontextmanager
>>> async def create_my_service():
...     service = MyCloudService(credentials)
...     try:
...         yield service
...     finally:
...         await service.close()
>>>
>>> async with create_drive(
...     file=create_my_service,
...     snapshot=create_snapshot,
... ) as drive:
...     await drive.sync()
abstractmethod async get_initial_cursor()

Get the initial checkpoint cursor for change tracking.

Returns a cursor representing the current state of the cloud drive before any changes are tracked. Used as the starting point for the first sync operation.

Returns:

Initial cursor string. Format is implementation-specific.

Return type:

str

Example

Implementation pattern:

>>> async def get_initial_cursor(self) -> str:
...     # For some APIs, this might be a special token
...     return "start_token"
...     # For others, might be current timestamp
...     return str(int(time.time()))
abstractmethod async get_root()

Fetch the root directory node from the cloud storage.

Retrieves the root directory’s metadata from the cloud API and returns it as a Node instance. This is typically cached by the SnapshotService after the first call.

Returns:

Node representing the root directory.

Raises:

AuthenticationError – If not authenticated.

Return type:

Node

Example

Implementation pattern:

>>> async def get_root(self) -> Node:
...     response = await self.api_client.get("/root")
...     return self._api_response_to_node(response)
abstractmethod get_changes(cursor)

Fetch incremental changes from the cloud storage.

Returns an async iterator that yields pages of changes since the given cursor. Each iteration provides a list of changes and the cursor for the next page.

This method implements polling or pagination to retrieve all changes since the given checkpoint.

Parameters:

cursor (str) – Starting cursor from previous sync or get_initial_cursor().

Yields:

Tuples of (changes, next_cursor) where – - changes: List of ChangeAction (additions/updates/removals) - next_cursor: Cursor for the next page or next sync

Return type:

AsyncIterator[tuple[list[ChangeAction], str]]

Example

Implementation pattern:

>>> async def get_changes(
...     self,
...     cursor: str,
... ) -> AsyncIterator[tuple[list[ChangeAction], str]]:
...     while True:
...         response = await self.api_client.get_changes(cursor)
...         changes = [self._convert_change(c) for c in response.changes]
...         yield changes, response.next_cursor
...         if not response.has_more:
...             break
...         cursor = response.next_cursor

Usage in sync:

>>> async for changes, next_cursor in service.get_changes(cursor):
...     await snapshot.apply_changes(changes, next_cursor)
abstractmethod async move(node, *, new_parent, new_name)

Relocate or rename a node.

Performs one or more operations on the node: - Rename: Set new_name to change the node’s name - Move: Set new_parent to move to a different directory

At least one of the optional parameters must be non-None.

Parameters:
  • node (Node) – The Node to modify.

  • new_parent (Node | None) – New parent directory, or None to keep current parent.

  • new_name (str | None) – New name, or None to keep current name.

Returns:

New Node instance reflecting the changes.

Raises:
Return type:

Node

abstractmethod async delete(node, *, permanent=False)

Delete a node.

Soft-deletes by default (best-effort per backend). Pass permanent=True for a hard delete that cannot be undone.

Parameters:
  • node (Node) – The Node to delete.

  • permanent (bool) – If True, permanently delete. If False (default), soft-delete (e.g. move to trash).

Raises:

AuthenticationError – If not authenticated.

Return type:

None

abstractmethod async restore(node)

Restore a trashed node.

Parameters:

node (Node) – The trashed Node to restore.

Returns:

Updated Node instance reflecting the restored state.

Raises:

NotImplementedError – If backend doesn’t support restore.

Return type:

Node

abstractmethod async purge_trash()

Permanently delete all trashed items.

Empties the trash bin, permanently removing all trashed nodes. This operation cannot be undone.

Raises:

AuthenticationError – If not authenticated.

Return type:

None

Example

Implementation pattern:

>>> async def purge_trash(self) -> None:
...     await self.api_client.empty_trash()
abstractmethod async create_directory(name, parent, *, exist_ok, private)

Create a new directory.

Creates a directory with the given name under the specified parent. Behavior when the directory exists depends on exist_ok.

Parameters:
  • name (str) – Name for the new directory.

  • parent (Node) – Parent directory Node.

  • exist_ok (bool) – If True, don’t raise error if directory exists. If False, raise NodeExistsError if directory exists.

  • private (PrivateDict | None) – Optional service-specific metadata.

Returns:

Node representing the created (or existing) directory.

Raises:
Return type:

Node

Example

Implementation pattern:

>>> async def create_directory(
...     self,
...     name: str,
...     parent: Node,
...     *,
...     exist_ok: bool,
...     private: PrivateDict | None,
... ) -> Node:
...     if not exist_ok:
...         existing = await self._find_child(parent.id, name)
...         if existing:
...             raise NodeExistsError(existing)
...     response = await self.api_client.create_folder(
...         name=name,
...         parent_id=parent.id,
...     )
...     return self._api_response_to_node(response)
abstractmethod download_file(node)

Create a readable stream for downloading a file.

Returns an async context manager that yields a ReadableFile for streaming the file content from cloud storage.

Parameters:

node (Node) – The file Node to download.

Returns:

Async context manager yielding ReadableFile.

Raises:
Return type:

AbstractAsyncContextManager[ReadableFile]

Example

Implementation pattern:

>>> @asynccontextmanager
>>> async def download_file(
...     self,
...     node: Node,
... ) -> AsyncIterator[ReadableFile]:
...     stream = await self.api_client.download(node.id)
...     try:
...         yield MyReadableFile(stream, node)
...     finally:
...         await stream.close()
abstractmethod upload_file(name, parent, *, size, mime_type, media_info, private)

Create a writable stream for uploading a file.

Returns an async context manager that yields a WritableFile for streaming content to cloud storage.

Parameters:
  • name (str) – Name for the uploaded file.

  • parent (Node) – Parent directory Node.

  • size (int | None) – File size in bytes, or None if unknown.

  • mime_type (str | None) – MIME type string, or None for default.

  • media_info (MediaInfo | None) – Optional media metadata for images/videos.

  • private (PrivateDict | None) – Optional service-specific metadata.

Returns:

Async context manager yielding WritableFile.

Raises:
Return type:

AbstractAsyncContextManager[WritableFile]

Example

Implementation pattern:

>>> @asynccontextmanager
>>> async def upload_file(
...     self,
...     name: str,
...     parent: Node,
...     *,
...     size: int | None,
...     mime_type: str | None,
...     media_info: MediaInfo | None,
...     private: PrivateDict | None,
... ) -> AsyncIterator[WritableFile]:
...     stream = await self.api_client.upload_start(
...         name=name,
...         parent_id=parent.id,
...         size=size,
...     )
...     try:
...         yield MyWritableFile(stream)
...     finally:
...         await stream.finalize()
abstractmethod async get_hasher_factory()

Get a factory for creating hash calculators.

Returns a factory function that creates Hasher instances compatible with this service’s hash algorithm (e.g., MD5, SHA-1, SHA-256).

Returns:

Factory function for creating Hasher instances.

Return type:

CreateHasher

Example

Implementation pattern:

>>> async def get_hasher_factory(self) -> CreateHasher:
...     async def create_hasher() -> Hasher:
...         return MD5Hasher()
...     return create_hasher
abstractmethod async is_authenticated()

Check if the service is authenticated and ready for operations.

Checks cached authentication state without performing I/O if possible. Should verify that credentials/tokens are present and not obviously expired.

Returns:

True if authenticated and ready, False otherwise.

Return type:

bool

Example

Implementation pattern:

>>> async def is_authenticated(self) -> bool:
...     return self.access_token is not None
abstractmethod async authenticate()

Authenticate the service for use.

Establishes authentication using implementation-specific mechanisms (OAuth tokens, API keys, etc.). Implementations must manage their own credential storage and token refresh.

This method is NOT interactive. OAuth implementations should handle token exchange separately before calling this method.

Raises:

AuthenticationError – If authentication fails.

Return type:

None

Example

Implementation pattern:

>>> async def authenticate(self) -> None:
...     if not self.credentials:
...         raise AuthenticationError("No credentials configured")
...     response = await self.api_client.validate_token(
...         self.credentials.access_token
...     )
...     if not response.valid:
...         raise AuthenticationError("Invalid credentials")
class wcpan.drive.core.types.SnapshotService

Bases: Service

Local cache interface for node metadata and change tracking.

SnapshotService defines the contract for maintaining a local cache of the cloud drive’s directory structure and file metadata. This enables: - Fast path lookups without API calls - Efficient change synchronization - Offline access to metadata

Implementations typically use:
  • SQLite database for persistence

  • In-memory structures for performance

  • Transaction support for atomic updates

Used With:

Paired with a FileService and passed to create_drive().

Thread Safety:

Implementations should handle concurrent access safely.

Example

Basic implementation structure:

>>> class SQLiteSnapshot(SnapshotService):
...     @property
...     def api_version(self) -> int:
...         return 5
...
...     async def get_root(self) -> Node:
...         # Query from database
...         pass
...
...     async def apply_changes(
...         self,
...         changes: list[ChangeAction],
...         cursor: str,
...     ) -> None:
...         # Update database in transaction
...         pass
...
...     # ... implement other abstract methods ...
abstractmethod async get_root()

Get the root directory node from the cache.

Retrieves the cached root node. This should be fast and not involve I/O to the cloud service.

Returns:

Cached Node representing the root directory.

Raises:

NodeNotFoundError – If root not yet cached (need to sync first).

Return type:

Node

Example

Implementation pattern:

>>> async def get_root(self) -> Node:
...     row = await self.db.fetch_one("SELECT * FROM nodes WHERE parent_id IS NULL")
...     if not row:
...         raise NodeNotFoundError("root")
...     return self._row_to_node(row)
abstractmethod async set_root(node)

Store the root directory node in the cache.

Called during the first sync to initialize the cache with the root node metadata.

Parameters:

node (Node) – The root Node to cache.

Return type:

None

Example

Implementation pattern:

>>> async def set_root(self, node: Node) -> None:
...     await self.db.execute(
...         "INSERT OR REPLACE INTO nodes VALUES (...)",
...         self._node_to_values(node),
...     )
abstractmethod async get_current_cursor()

Get the current synchronization cursor.

Returns the cursor from the most recent sync operation. If no sync has occurred yet (first run), returns an empty string.

Returns:

Current cursor string, or empty string if never synced.

Return type:

str

Example

Implementation pattern:

>>> async def get_current_cursor(self) -> str:
...     row = await self.db.fetch_one("SELECT cursor FROM metadata")
...     return row["cursor"] if row else ""
abstractmethod async get_node_by_id(node_id)

Retrieve a cached node by its ID.

Fast lookup of node metadata from the local cache using the node’s unique identifier.

Parameters:

node_id (str) – The node’s unique ID.

Returns:

The cached Node.

Raises:

NodeNotFoundError – If node not found in cache.

Return type:

Node

Example

Implementation pattern:

>>> async def get_node_by_id(self, node_id: str) -> Node:
...     row = await self.db.fetch_one(
...         "SELECT * FROM nodes WHERE id = ?",
...         node_id,
...     )
...     if not row:
...         raise NodeNotFoundError(node_id)
...     return self._row_to_node(row)
abstractmethod async get_node_by_path(path)

Resolve a node by its absolute path.

Traverses the cached directory structure to find the node at the given path.

Parameters:

path (PurePath) – Absolute path to the node.

Returns:

The Node at the given path.

Raises:

NodeNotFoundError – If path does not exist in cache.

Return type:

Node

Example

Implementation pattern:

>>> async def get_node_by_path(self, path: PurePath) -> Node:
...     node = await self.get_root()
...     for part in path.parts[1:]:  # Skip root "/"
...         node = await self.get_child_by_name(part, node.id)
...     return node
abstractmethod async resolve_path_by_id(node_id)

Resolve the absolute path for a node ID.

Walks up the parent chain to construct the full path for the given node.

Parameters:

node_id (str) – The node’s unique ID.

Returns:

Absolute PurePath to the node.

Raises:

NodeNotFoundError – If node not found in cache.

Return type:

PurePath

Example

Implementation pattern:

>>> async def resolve_path_by_id(self, node_id: str) -> PurePath:
...     parts = []
...     node = await self.get_node_by_id(node_id)
...     while node.parent_id:
...         parts.insert(0, node.name)
...         node = await self.get_node_by_id(node.parent_id)
...     return PurePath("/", *parts)
abstractmethod async get_child_by_name(name, parent_id)

Find a child node by name under a parent.

Searches for a direct child with the given name under the specified parent node.

Parameters:
  • name (str) – The child’s name.

  • parent_id (str) – The parent node’s ID.

Returns:

The child Node.

Raises:

NodeNotFoundError – If child not found.

Return type:

Node

Example

Implementation pattern:

>>> async def get_child_by_name(self, name: str, parent_id: str) -> Node:
...     row = await self.db.fetch_one(
...         "SELECT * FROM nodes WHERE parent_id = ? AND name = ?",
...         parent_id,
...         name,
...     )
...     if not row:
...         raise NodeNotFoundError(name)
...     return self._row_to_node(row)
abstractmethod async get_children_by_id(parent_id)

Get all direct children of a parent node.

Retrieves the list of immediate children (both files and directories) under the specified parent.

Parameters:

parent_id (str) – The parent node’s ID.

Returns:

List of child Nodes (may be empty).

Return type:

list[Node]

Example

Implementation pattern:

>>> async def get_children_by_id(self, parent_id: str) -> list[Node]:
...     rows = await self.db.fetch_all(
...         "SELECT * FROM nodes WHERE parent_id = ? ORDER BY name",
...         parent_id,
...     )
...     return [self._row_to_node(row) for row in rows]
abstractmethod async get_trashed_nodes()

Get all nodes currently in the trash.

Retrieves all nodes marked as trashed from the cache.

Returns:

List of trashed Nodes (may be empty).

Return type:

list[Node]

Example

Implementation pattern:

>>> async def get_trashed_nodes(self) -> list[Node]:
...     rows = await self.db.fetch_all(
...         "SELECT * FROM nodes WHERE is_trashed = 1"
...     )
...     return [self._row_to_node(row) for row in rows]
abstractmethod async apply_changes(changes, cursor)

Apply change actions to the cache and update cursor.

Processes a list of changes (additions, updates, removals) and updates the cache accordingly. Should be atomic - either all changes apply successfully or none do.

Parameters:
  • changes (list[ChangeAction]) – List of ChangeAction to apply.

  • cursor (str) – New cursor value to store after applying changes.

Return type:

None

Example

Implementation pattern:

>>> async def apply_changes(
...     self,
...     changes: list[ChangeAction],
...     cursor: str,
... ) -> None:
...     async with self.db.transaction():
...         for change in changes:
...             if change[0]:  # RemoveAction
...                 await self.db.execute(
...                     "DELETE FROM nodes WHERE id = ?",
...                     change[1],
...                 )
...             else:  # UpdateAction
...                 node = change[1]
...                 await self.db.execute(
...                     "INSERT OR REPLACE INTO nodes VALUES (...)",
...                     self._node_to_values(node),
...                 )
...         await self.db.execute(
...             "UPDATE metadata SET cursor = ?",
...             cursor,
...         )
abstractmethod async find_nodes_by_regex(pattern)

Find nodes whose names match a regex pattern.

Searches the cache for nodes with names matching the given regular expression pattern.

Parameters:

pattern (str) – Regular expression pattern to match against node names.

Returns:

List of matching Nodes (may be empty).

Return type:

list[Node]

Example

Implementation pattern:

>>> async def find_nodes_by_regex(self, pattern: str) -> list[Node]:
...     import re
...     regex = re.compile(pattern)
...     rows = await self.db.fetch_all("SELECT * FROM nodes")
...     nodes = [self._row_to_node(row) for row in rows]
...     return [n for n in nodes if regex.search(n.name)]
class wcpan.drive.core.types.ReadableFile

Bases: AsyncIterable[bytes]

Async readable file interface for downloading cloud files.

Provides sequential and seekable read access to cloud file content. Returned by Drive.download_file() and FileService.download_file() as an async context manager.

This interface extends AsyncIterable[bytes], allowing iteration over file chunks.

Thread Safety:

Not thread-safe. Each ReadableFile instance should be used from a single async task.

Example

Basic download pattern:

>>> async with drive.download_file(node) as file:
...     data = await file.read(8192)
...     while data:
...         process(data)
...         data = await file.read(8192)

Using as async iterator:

>>> async with drive.download_file(node) as file:
...     async for chunk in file:
...         process(chunk)

Seeking and resuming:

>>> async with drive.download_file(node) as file:
...     await file.seek(1024)  # Skip first 1KB
...     data = await file.read(4096)
abstractmethod async read(length)

Read at most length bytes from the file.

Reads up to the specified number of bytes from the current position. Returns fewer bytes if EOF is reached.

Parameters:

length (int) – Maximum number of bytes to read.

Returns:

Bytes read from the file. Empty bytes object indicates EOF.

Return type:

bytes

Example

Reading in chunks:

>>> chunk = await file.read(8192)
>>> if chunk:
...     print(f"Read {len(chunk)} bytes")
abstractmethod async seek(offset)

Seek to the specified position in the file.

Moves the file pointer to the given absolute offset from the beginning of the file. Subsequent reads will start from this position.

Parameters:

offset (int) – Absolute byte offset from the beginning of the file.

Returns:

The new absolute position.

Return type:

int

Example

Resume download from specific position:

>>> await file.seek(1048576)  # Seek to 1MB
>>> data = await file.read(8192)
abstractmethod async node()

Get the Node being read.

Returns the Node instance representing the file being downloaded. Useful for accessing metadata during download.

Returns:

The Node instance for this file.

Return type:

Node

Example

Accessing node during download:

>>> async with drive.download_file(node) as file:
...     current_node = await file.node()
...     print(f"Downloading: {current_node.name}")
class wcpan.drive.core.types.WritableFile

Bases: object

Async writable file interface for uploading files to cloud storage.

Provides sequential and seekable write access for uploading content. Returned by Drive.upload_file() and FileService.upload_file() as an async context manager.

Supports resumable uploads through seek() and tell() for recovering from interruptions.

Thread Safety:

Not thread-safe. Each WritableFile instance should be used from a single async task.

Example

Basic upload pattern:

>>> async with drive.upload_file("file.txt", parent, size=1024) as file:
...     with open("local.txt", "rb") as f:
...         chunk = f.read(8192)
...         while chunk:
...             await file.write(chunk)
...             chunk = f.read(8192)
...     await file.flush()
...     node = await file.node()

Resumable upload:

>>> async with drive.upload_file("large.bin", parent, size=size) as file:
...     try:
...         # ... write data ...
...         await file.flush()
...     except TimeoutError:
...         offset = await file.tell()
...         await file.seek(offset)
...         # Continue from offset
abstractmethod async tell()

Get the current write position.

Returns the absolute byte offset of the current write position. Used for resumable uploads to determine how much data was successfully written before an interruption.

Returns:

Current byte offset from the beginning of the file.

Return type:

int

Example

Checking upload progress:

>>> bytes_written = await file.tell()
>>> print(f"Uploaded {bytes_written}/{total_size} bytes")
abstractmethod async seek(offset)

Seek to the specified position for resuming uploads.

Moves the write pointer to the given absolute offset from the beginning. Used to resume uploads after interruptions.

Parameters:

offset (int) – Absolute byte offset from the beginning.

Returns:

The new absolute position.

Return type:

int

Example

Resuming upload:

>>> last_position = await file.tell()
>>> # ... connection lost ...
>>> await file.seek(last_position)
>>> # Continue writing from last position
abstractmethod async write(chunk)

Write data to the upload stream.

Writes the given bytes to the file at the current position. May buffer data internally - call flush() to ensure data is sent to the server.

Parameters:

chunk (bytes) – Bytes to write.

Returns:

Number of bytes written to the buffer.

Return type:

int

Example

Writing with progress tracking:

>>> total_written = 0
>>> for chunk in chunks:
...     written = await file.write(chunk)
...     total_written += written
...     print(f"Progress: {total_written} bytes")
abstractmethod async flush()

Flush buffered data to the server.

Ensures all buffered data is sent to the cloud storage backend. Should be called before closing the file or checking upload status.

Example

Flushing before getting node:

>>> await file.write(data)
>>> await file.flush()
>>> node = await file.node()
Return type:

None

abstractmethod async node()

Get the uploaded Node.

Returns the Node instance representing the uploaded file. Should be called after flush() to ensure the upload is complete.

Returns:

The Node instance for the uploaded file.

Raises:

NodeNotFoundError – If upload failed or is incomplete.

Return type:

Node

Example

Getting uploaded node:

>>> async with drive.upload_file("doc.pdf", parent, size=size) as file:
...     await file.write(data)
...     await file.flush()
...     uploaded_node = await file.node()
...     print(f"Uploaded: {uploaded_node.id}")
class wcpan.drive.core.types.Hasher

Bases: object

Hash calculator for file integrity verification.

Provides streaming hash calculation compatible with the cloud service’s hash algorithm. Implementations must match the hashing method used by the specific cloud storage backend.

Supports incremental updates for efficient hashing of large files and can be copied mid-stream for checkpointing.

Example

Computing file hash:

>>> hasher_factory = await drive.get_hasher_factory()
>>> hasher = await hasher_factory()
>>> with open("file.txt", "rb") as f:
...     while chunk := f.read(8192):
...         await hasher.update(chunk)
>>> file_hash = await hasher.hexdigest()

Using copy for checkpointing:

>>> hasher1 = await hasher_factory()
>>> await hasher1.update(b"data")
>>> hasher2 = await hasher1.copy()
>>> await hasher1.update(b"more")
>>> await hasher2.update(b"different")
>>> hash1 = await hasher1.hexdigest()
>>> hash2 = await hasher2.hexdigest()
abstractmethod async update(data)

Feed data into the hash calculation.

Updates the hash with the provided data. Can be called multiple times to process data incrementally.

Parameters:

data (bytes) – Bytes to include in the hash calculation.

Return type:

None

Example

Incremental hashing:

>>> await hasher.update(b"Hello ")
>>> await hasher.update(b"World")
>>> digest = await hasher.hexdigest()
abstractmethod async digest()

Get the raw binary hash digest.

Returns the hash as raw bytes. The length depends on the hash algorithm (e.g., 16 bytes for MD5, 32 bytes for SHA-256).

Returns:

Raw binary hash digest.

Return type:

bytes

Example

Getting binary digest:

>>> raw_hash = await hasher.digest()
>>> print(f"Hash length: {len(raw_hash)} bytes")
abstractmethod async hexdigest()

Get the hexadecimal hash digest.

Returns the hash as a lowercase hexadecimal string, which is typically used for comparison with Node.hash values.

Returns:

Hexadecimal string representation of the hash.

Return type:

str

Example

Comparing with node hash:

>>> computed = await hasher.hexdigest()
>>> if computed == node.hash:
...     print("Hash matches!")
abstractmethod async copy()

Create an independent copy of this hasher.

Returns a new Hasher instance with the same internal state, allowing divergent hash calculations from the same checkpoint.

Returns:

New Hasher instance with copied state.

Return type:

Self

Example

Checkpointing hash calculation:

>>> checkpoint = await hasher.copy()
>>> await hasher.update(b"path1")
>>> await checkpoint.update(b"path2")

Data Classes

class wcpan.drive.core.types.Node(*, id, parent_id, name, is_directory, is_trashed, ctime, mtime, mime_type, hash, size, is_image, is_video, width, height, ms_duration, private)

Bases: object

Immutable representation of a file or directory in the cloud drive.

Node is the core data structure representing both files and directories. All attributes are immutable - to modify a node, use Drive methods which return new Node instances with updated values.

Parameters:
id

Unique identifier for this node assigned by the service.

Type:

str

parent_id

ID of the parent directory, or None for root node.

Type:

str | None

name

Name of the file or directory (without path).

Type:

str

is_directory

True if this node represents a directory.

Type:

bool

is_trashed

True if this node is in the trash.

Type:

bool

ctime

Creation timestamp.

Type:

datetime.datetime

mtime

Last modification timestamp.

Type:

datetime.datetime

mime_type

MIME type string (e.g., “image/jpeg”, “video/mp4”).

Type:

str

hash

Content hash for files, empty string for directories.

Type:

str

size

File size in bytes, 0 for directories.

Type:

int

is_image

True if this is an image file with media metadata.

Type:

bool

is_video

True if this is a video file with media metadata.

Type:

bool

width

Image/video width in pixels, 0 if not applicable.

Type:

int

height

Image/video height in pixels, 0 if not applicable.

Type:

int

ms_duration

Video duration in milliseconds, 0 if not applicable.

Type:

int

private

Service-specific metadata, or None if not used.

Type:

wcpan.drive.core.types.PrivateDict | None

Note

Node is frozen (immutable). To change properties, use Drive.move() or other Drive methods which return new Node instances.

Example

Accessing node properties:

>>> node = await drive.get_node_by_path(PurePath("/photos/image.jpg"))
>>> print(f"{node.name}: {node.size} bytes")
>>> if node.is_image:
...     print(f"Dimensions: {node.width}x{node.height}")

Checking if node is root:

>>> root = await drive.get_root()
>>> is_root = node.parent_id is None
class wcpan.drive.core.types.SourceConfig(*, name, file, snapshot)

Bases: object

Configuration for a single storage backend source.

Used with create_multi_drive() to configure one or more storage backends. In single-source mode, paths are unchanged. In multi-source mode (2+ sources), each source’s paths are prefixed with its name (e.g., /google/docs/file.txt).

Parameters:
name

Source name used as path prefix in multi-source mode.

Type:

str

file

Factory function returning a FileService context manager.

Type:

wcpan.drive.core.types.CreateFileService

snapshot

Factory function returning a SnapshotService context manager.

Type:

wcpan.drive.core.types.CreateSnapshotService

class wcpan.drive.core.types.MediaInfo(*, is_image=False, is_video=False, width=0, height=0, ms_duration=0)

Bases: object

Media metadata for image and video files.

Encapsulates dimension and duration information for media files. Use the factory methods image() or video() to create instances with appropriate metadata.

Factory Methods:

image(width, height): Create metadata for an image file. video(width, height, ms_duration): Create metadata for a video file.

Properties:

is_image: True if this represents image metadata. is_video: True if this represents video metadata. width: Width in pixels. height: Height in pixels. ms_duration: Duration in milliseconds (video only).

Example

Creating image metadata:

>>> media = MediaInfo.image(width=1920, height=1080)
>>> await upload_file_from_local(
...     drive,
...     path,
...     parent,
...     mime_type="image/jpeg",
...     media_info=media,
... )

Creating video metadata:

>>> media = MediaInfo.video(width=1920, height=1080, ms_duration=120000)
>>> await upload_file_from_local(
...     drive,
...     path,
...     parent,
...     mime_type="video/mp4",
...     media_info=media,
... )
Parameters:
classmethod image(width, height)

Create MediaInfo for an image file.

Parameters:
  • width (int) – Image width in pixels.

  • height (int) – Image height in pixels.

Returns:

MediaInfo instance with is_image=True.

Return type:

Self

classmethod video(width, height, ms_duration)

Create MediaInfo for a video file.

Parameters:
  • width (int) – Video width in pixels.

  • height (int) – Video height in pixels.

  • ms_duration (int) – Video duration in milliseconds.

Returns:

MediaInfo instance with is_video=True.

Return type:

Self

property is_image: bool

True if this represents image metadata.

property is_video: bool

True if this represents video metadata.

property width: int

Width in pixels, or 0 if not applicable.

property height: int

Height in pixels, or 0 if not applicable.

property ms_duration: int

Duration in milliseconds for video, or 0 if not applicable.

Type Aliases

wcpan.drive.core.types.ChangeAction = ChangeAction

Union of change actions returned by drive synchronization.

Represents either a node removal (RemoveAction) or node update (UpdateAction). Use type guard functions is_remove() and is_update() for type-safe handling.

Example

Using type guards:

>>> from wcpan.drive.core.lib import is_remove, is_update
>>> async for change in drive.sync():
...     if is_remove(change):
...         removed, node_id = change
...         print(f"Removed: {node_id}")
...     elif is_update(change):
...         updated, node = change
...         print(f"Updated: {node.name}")

Using dispatch helper:

>>> from wcpan.drive.core.lib import dispatch_change
>>> result = dispatch_change(
...     change,
...     on_remove=lambda id: print(f"Removed {id}"),
...     on_update=lambda node: print(f"Updated {node.name}"),
... )
wcpan.drive.core.types.RemoveAction = RemoveAction

Change action representing a removed node.

A tuple of (True, node_id) indicating that the node with the given ID was removed from the drive.

Example

Pattern matching on RemoveAction:

>>> if change[0]:  # RemoveAction
...     removed, node_id = change
...     print(f"Node removed: {node_id}")
wcpan.drive.core.types.UpdateAction = UpdateAction

Change action representing an updated or added node.

A tuple of (False, node) indicating that the node was either newly created or had its metadata updated.

Example

Pattern matching on UpdateAction:

>>> if not change[0]:  # UpdateAction
...     updated, node = change
...     print(f"Node updated: {node.name}")
wcpan.drive.core.types.PrivateDict = PrivateDict

Service-specific metadata dictionary for custom attributes.

Implementations can store arbitrary string key-value pairs to support service-specific features not covered by the standard Node attributes. For example, sharing permissions, labels, or custom properties.

wcpan.drive.core.types.CreateFileService = CreateFileService

Factory type for creating FileService instances.

See CreateService for details.

wcpan.drive.core.types.CreateFileServiceMiddleware = CreateFileServiceMiddleware

Middleware factory type for wrapping FileService instances.

See CreateServiceMiddleware for details.

wcpan.drive.core.types.CreateSnapshotService = CreateSnapshotService

Factory type for creating SnapshotService instances.

See CreateService for details.

wcpan.drive.core.types.CreateSnapshotServiceMiddleware = CreateSnapshotServiceMiddleware

Middleware factory type for wrapping SnapshotService instances.

See CreateServiceMiddleware for details.