API
UnitOfWork
Unit of Work (UOW) — the main entry point of FennFlow.
It coordinates file operations by managing: - backend (operation metadata storage) - connector (actual storage, e.g. S3) - execution and compensation logic (Saga pattern)
Example:: class UOW(UnitOfWork): config = ConfigDict( backend=PostgresBackendConfig(...), connector=S3ConnectorConfig(...), ) user_files = S3RepoField(UserFiles, bucket_name="bucket_name") # or # user_files = RepoField(UserFiles, namespace="bucket_name")
async with UOW() as uow:
await uow.user_files.at("user1/").put(file)
Behavior:
- By default, auto_commit=True:
commits all operations if the context exits successfully
- If an exception occurs or auto_commit=False:
triggers rollback with compensation logic
Important: - Users should NOT interact with backend or connector directly - All operations must go through UOW - Rollback applies compensation in reverse order (Saga pattern)
Attributes:
| Name | Type | Description |
|---|---|---|
backend |
BackendOrchestrator
|
Stores operation metadata (pending, done, failed) |
connector |
AbstractConnector
|
Performs actual storage operations (e.g. S3 API calls) |
._operation_executor: Executes and compensates operations
Methods:
| Name | Description |
|---|---|
commit |
Persists operation state via backend |
rollback |
Runs compensation for all pending operations and then rolls back backend state |
Source code in src/fennflow/uow/core.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 | |
backend
property
Direct access to the backend for read-only inspection.
Warning: mutating backend state directly bypasses Saga guarantees. Use UoW methods for all write operations.
connector
property
Direct access to the connector.
Warning: operations performed directly on the connector are not tracked by the backend. Therefore, they will not be compensated by uow.
UowInspector
dataclass
Extracts info from Unit of Work.
Source code in src/fennflow/uow/inspector.py
14 15 16 17 18 19 20 21 22 23 | |
ConfigDict
Bases: TypedDict
Configuration for a UnitOfWork instance.
All fields are optional — if not provided, defaults are used.
Attributes:
| Name | Type | Description |
|---|---|---|
backend |
BackendConfig
|
Configuration for the metadata backend
(e.g. |
connector |
ConnectorConfig
|
Configuration for the storage connector (e.g. |
Example
class UOW(UnitOfWork): config = ConfigDict( backend=InMemoryBackendConfig(), connector=S3ConnectorConfig(...), )
Source code in src/fennflow/core/configs/base.py
8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 | |
BackendFactory
Factory for creating backends from config.
Source code in src/fennflow/backends/_factory.py
13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 | |
InMemoryBackendConfig
Bases: AbstractBackendConfig
Configuration for the in-memory backend.
No configuration is required — the in-memory backend is zero-dependency.
Source code in src/fennflow/backends/in_memory/config.py
4 5 6 7 8 9 | |
ConnectorFactory
Factory for creating connector instances from config objects.
Resolves the appropriate connector class from connector_registry
based on the config class name.
Example::
connector = ConnectorFactory.from_config(S3ConnectorConfig(...))
Source code in src/fennflow/connectors/_factory.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | |
from_config(config)
staticmethod
Create a connector instance from a config object.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
config
|
ConnectorConfig
|
The connector configuration instance. |
required |
Returns:
| Type | Description |
|---|---|
AbstractConnector
|
An initialized connector instance. |
Raises:
| Type | Description |
|---|---|
ValueError
|
If no connector is registered for the config type. |
Source code in src/fennflow/connectors/_factory.py
23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 | |
InMemoryConnector
Bases: AbstractConnector
In-memory connector for file storage, primarily used for testing.
Stores files in a class-level dictionary shared across all instances,
Use drop_all() between tests to reset state.
Example::
class UOW(UnitOfWork):
config = ConfigDict(
connector=InMemoryConnectorConfig(),
)
Source code in src/fennflow/connectors/in_memory/core.py
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 | |
InMemoryConnectorConfig
Bases: AbstractConnectorConfig
Configuration for the in-memory connector.
No configuration is required — the in-memory connector is zero-dependency and is intended for testing and development only.
Source code in src/fennflow/connectors/in_memory/config.py
4 5 6 7 8 9 | |
S3Connector
Bases: AbstractConnector[S3Extra]
Connector for AWS S3-compatible object storage via aiobotocore.
Use S3ConnectorConfig to configure credentials, region, etc.
Example::
class UOW(UnitOfWork):
config = ConfigDict(
connector=S3ConnectorConfig(...),
)
Source code in src/fennflow/connectors/s3/core.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 | |
S3ConnectorConfig
Bases: AbstractConnectorConfig
Configuration for the S3 connector.
Credentials can be provided explicitly via this config or through any method
supported by the AWS credential chain
(environment variables, ~/.aws/credentials, IAM roles, etc.).
See the AWS documentation
<https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html>_
for the full list of supported options.
Attributes:
| Name | Type | Description |
|---|---|---|
aws_access_key_id |
str | None
|
AWS access key ID. |
aws_secret_access_key |
str | None
|
AWS secret access key. |
endpoint_url |
str | None
|
Custom endpoint URL for S3-compatible storage. |
aiobotocore_config |
AioConfig | None
|
Advanced aiobotocore client configuration. |
Example::
# explicit credentials
class UOW(UnitOfWork):
user_files = UserFiles
config = ConfigDict(
connector=S3ConnectorConfig(
aws_access_key_id="key",
aws_secret_access_key="secret",
)
)
# rely on AWS credential chain
S3ConnectorConfig()
Source code in src/fennflow/connectors/s3/config.py
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | |
CreateRepository
Bases: AtRepository, ValidateDuplicatesMixin
Repository for uploading (creating) files in the storage.
This repository implements the "create" operation, which uploads new files to the configured storage (e.g. S3) within the current Unit of Work.
Example::
file1 = TextContent.from_content("This is the first file.")
await uow.user_files.at("user1/").create(file1)
Behavior:
- Each file is registered in the backend as a pending operation
- Files are uploaded via the connector
- Backend commit is executed on uow.commit
Raises: RecordAlreadyExistsException: If a file with the same path already exists in a backend FilepathsCollisionError: If files with the same filepath are passed
Source code in src/fennflow/repositories/create.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 | |
DeleteRepository
Bases: AtRepository
Repository mixin for deleting files from storage.
Implements Saga-based deletion with automatic compensation on failure.
Source code in src/fennflow/repositories/delete.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | |
delete(path, **provider_extra)
async
Delete a file from storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Path to the file relative to the current directory. |
required |
**provider_extra
|
Any
|
Additional kwargs forwarded to the connector. |
{}
|
Returns:
| Type | Description |
|---|---|
bool
|
True if the file was deleted, False if it did not exist. |
Source code in src/fennflow/repositories/delete.py
20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | |
GetRepository
Bases: AtRepository
Repository for retrieving a file from storage within the current scope.
This method returns a MediaResponse object containing the requested file,
if it exists according to the backend (source of truth).
Example::
response = await uow.user_files.at("user1/").get("file.txt")
if response:
file = response[0]
Behavior:
- The backend is treated as the source of truth
- If the file is not present in the backend, the storage is NOT queried
- If the file exists in the backend, it is fetched from the storage via the connector
Notes:
- This method is read-only and does not participate in transaction flows (no saga)
- No network request is made if the backend does not contain the file
- Storage and backend may become inconsistent
(e.g. after restart with InMemoryBackend);
in such cases, use a reconcile mechanism to resync state.
(
)
Source code in src/fennflow/repositories/get.py
10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | |
get(*paths, **provider_extra)
async
Retrieve a file from storage within the current scope.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
*paths
|
str
|
Relative file's paths within the scoped repository |
()
|
**provider_extra
|
Any
|
Additional provider-specific parameters passed directly to the connector
(e.g. S3 |
{}
|
Returns:
| Name | Type | Description |
|---|---|---|
MediaResponse |
MediaResponse
|
|
Source code in src/fennflow/repositories/get.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | |
ListRepository
Bases: AtRepository
Repository for retrieving a files from storage within the current scope.
Source code in src/fennflow/repositories/list.py
9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | |
list(prefix='', continuation_token=OMIT, limit=1000)
async
Uploads files under the current path, optionally filtered by prefix.
Files are visible if they are uploaded (committed) or pending within the current session. Pending files from other sessions are not returned.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prefix
|
str
|
Sub-path to filter results. Appended to the current |
''
|
continuation_token
|
Omittable[str]
|
Opaque token returned by a previous call to continue paginating. |
OMIT
|
limit
|
int
|
Maximum number of storage_paths to return. Defaults to |
1000
|
Returns:
| Name | Type | Description |
|---|---|---|
ListResponse |
ListResponse
|
A container of storage_paths matching the query. Includes
a |
Example::
async with UOW() as uow:
await uow.files.at("folder1/").put(file1, file2, file3)
page = await uow.files.at("folder1/").list(limit=2)
next_page = await uow.files.at("folder1/").list(
limit=2,
continuation_token=page.continuation_token,
)
Source code in src/fennflow/repositories/list.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | |
PutRepository
Bases: AtRepository, ValidateDuplicatesMixin
Repository for upserting files in the storage.
This repository implements the "put" operation, which uploads new files to the configured storage (e.g. S3) within the current Unit of Work.
Example::
file1 = TextContent.from_content("This is the first file.")
await uow.user_files.at("user1/").put(file1)
Behavior:
- Each file is registered in the backend as a pending operation
- Files are uploaded via the connector
- Backend commit is executed on uow.commit
Raises: FilepathsCollisionError: If files with the same filepath are passed
Source code in src/fennflow/repositories/put.py
19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 | |
RepoField
Bases: Generic[RepoType]
A descriptor that lazily initializes a repository instance on a UnitOfWork.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
repo_cls
|
type[RepoType]
|
The repository class to instantiate. |
required |
namespace
|
Namespace
|
The storage namespace (e.g. S3 bucket name) for this repository. |
required |
Example
class UOW(UnitOfWork): user_files = RepoField(UserFiles, namespace="user-files")
Source code in src/fennflow/repositories/fields/base.py
21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 | |
S3RepoField(repo_cls, bucket_name)
Create a RepoField configured for S3 storage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
repo_cls
|
type[RepoType]
|
The repository class to instantiate. |
required |
bucket_name
|
BucketName
|
alias for RepoField.namespace. |
required |
Returns:
| Type | Description |
|---|---|
RepoField[RepoType]
|
A configured RepoField bound to the given repository class. |
Example::
class UOW(UnitOfWork):
user_files = S3RepoField(UserFiles, bucket_name="my-bucket")
Source code in src/fennflow/repositories/fields/s3.py
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 | |
AudioContent
Bases: BinaryContent
Media content representing an audio file.
Attributes:
| Name | Type | Description |
|---|---|---|
duration |
int | None
|
Duration of the audio in seconds, if known. |
Source code in src/fennflow/files/media/audio_content.py
4 5 6 7 8 9 10 11 | |
BinaryContent
Bases: BaseContent
Base class for binary content type.
Source code in src/fennflow/files/media/binary_content.py
12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 | |
ContentFactory
Factory for creating media content instances from raw data.
Resolves the appropriate content class from the registry based on
MIME type, falling back to BinaryContent for unknown types.
Example
content = ContentFactory.from_bytes("text/plain", b"Hello, World!") url = ContentFactory.from_url("https://example.com/file.txt")
Source code in src/fennflow/files/factory.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | |
from_bytes(media_type, data, **kwargs)
classmethod
Create a media content instance from raw bytes.
Resolves the content class from the registry by exact MIME type match,
then by prefix match, falling back to BinaryContent if no match is found.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
media_type
|
str
|
The MIME type of the content (e.g. |
required |
data
|
bytes
|
The raw bytes to wrap. |
required |
**kwargs
|
Any
|
Additional fields passed to the content model. |
{}
|
Returns:
| Type | Description |
|---|---|
BinaryMedia
|
A media content instance appropriate for the given MIME type. |
Raises:
| Type | Description |
|---|---|
TypeError
|
If |
ValueError
|
If the resolved content class fails validation. |
Source code in src/fennflow/files/factory.py
37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 | |
from_url(url, media_type='application/octet-stream', **kwargs)
staticmethod
Create a UrlContent instance from a URL string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
url
|
str
|
The URL string to wrap. |
required |
media_type
|
str
|
The MIME type of the resource.
Defaults to |
'application/octet-stream'
|
**kwargs
|
Any
|
Additional fields passed to the content model. |
{}
|
Returns:
| Type | Description |
|---|---|
UrlContent
|
A |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the URL fails validation. |
Source code in src/fennflow/files/factory.py
87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 | |
DocumentContent
Bases: BinaryContent
Media content representing a document.
Source code in src/fennflow/files/media/document_content.py
4 5 | |
ImageContent
Bases: BinaryContent
Media content representing an image file.
Attributes:
| Name | Type | Description |
|---|---|---|
height |
int | None
|
Height of the image in pixels, if known. |
width |
int | None
|
Width of the image in pixels, if known. |
Source code in src/fennflow/files/media/image_content.py
4 5 6 7 8 9 10 11 12 13 | |
JsonContent
Bases: BinaryContent, FromContentAbstract, ContentPropertyAbstract, Generic[Value]
Media content representing a JSON file.
Stores JSON data as UTF-8 encoded bytes internally.
Use from_content() to create from a Python object.
Attributes:
| Name | Type | Description |
|---|---|---|
encoding |
str
|
The text encoding. Defaults to |
Example::
file = JsonContent.from_content({"key": "value"})
print(file.content) # {"key": "value"}
await uow.user_files.at("user1/").put(file)
Source code in src/fennflow/files/media/json_content.py
16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 | |
TextContent
Bases: BinaryContent, FromContentAbstract, ContentPropertyAbstract
Media content representing a plain text file.
Stores text as UTF-8 encoded bytes internally.
Use from_content() to create from a string.
Attributes:
| Name | Type | Description |
|---|---|---|
encoding |
str
|
The text encoding. Defaults to |
Example::
file = TextContent.from_content("Hello, World!")
print(file.content) # "Hello, World!"
await uow.user_files.at("user1/").put(file)
Source code in src/fennflow/files/media/text_content.py
14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | |
UrlContent
Bases: BaseContent
Media content representing a URL.
Attributes:
| Name | Type | Description |
|---|---|---|
data |
str
|
The URL string. |
Source code in src/fennflow/files/media/url_content.py
4 5 6 7 8 9 10 11 | |
VideoContent
Bases: BinaryContent
Media content representing a video file.
Attributes:
| Name | Type | Description |
|---|---|---|
duration |
int | None
|
Duration of the video in seconds. |
height |
int | None
|
Height of the video in pixels. |
width |
int | None
|
Width of the video in pixels. |
Source code in src/fennflow/files/media/video_content.py
4 5 6 7 8 9 10 11 12 13 14 15 | |