Skip to content

API

UnitOfWork

Unit of Work (UOW) — the main entry point of FennFlow.

It coordinates file operations by managing: - backend (operation metadata storage) - connector (actual storage, e.g. S3) - execution and compensation logic (Saga pattern)

Example:: class UOW(UnitOfWork): config = ConfigDict( backend=PostgresBackendConfig(...), connector=S3ConnectorConfig(...), ) user_files = S3RepoField(UserFiles, bucket_name="bucket_name") # or # user_files = RepoField(UserFiles, namespace="bucket_name")

async with UOW() as uow:
    await uow.user_files.at("user1/").put(file)

Behavior: - By default, auto_commit=True: commits all operations if the context exits successfully - If an exception occurs or auto_commit=False: triggers rollback with compensation logic

Important: - Users should NOT interact with backend or connector directly - All operations must go through UOW - Rollback applies compensation in reverse order (Saga pattern)

Attributes:

Name Type Description
backend BackendOrchestrator

Stores operation metadata (pending, done, failed)

connector AbstractConnector

Performs actual storage operations (e.g. S3 API calls)

._operation_executor: Executes and compensates operations

Methods:

Name Description
commit

Persists operation state via backend

rollback

Runs compensation for all pending operations and then rolls back backend state

Source code in src/fennflow/uow/core.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
class UnitOfWork:
    """Unit of Work (UOW) — the main entry point of FennFlow.

    It coordinates file operations by managing:
    - backend (operation metadata storage)
    - connector (actual storage, e.g. S3)
    - execution and compensation logic (Saga pattern)

    **Example**::
        class UOW(UnitOfWork):
            config = ConfigDict(
                backend=PostgresBackendConfig(...),
                connector=S3ConnectorConfig(...),
            )
            user_files = S3RepoField(UserFiles, bucket_name="bucket_name")
            # or
            # user_files = RepoField(UserFiles, namespace="bucket_name")

        async with UOW() as uow:
            await uow.user_files.at("user1/").put(file)

    **Behavior**:
    - By default, `auto_commit=True`:
        commits all operations if the context exits successfully
    - If an exception occurs or `auto_commit=False`:
        triggers rollback with compensation logic

    Important:
    - Users should NOT interact with backend or connector directly
    - All operations must go through UOW
    - Rollback applies compensation in reverse order (Saga pattern)

    Attributes:
        backend:
            Stores operation metadata (pending, done, failed)

        connector:
            Performs actual storage operations (e.g. S3 API calls)

       ._operation_executor:
            Executes and compensates operations

    Methods:
        commit():
            Persists operation state via backend

        rollback():
            Runs compensation for all pending operations
            and then rolls back backend state
    """

    config: ConfigDict | None = None

    def __init__(
        self,
        auto_commit: bool = True,
    ):
        self._auto_commit = auto_commit
        self._session_id = uuid.uuid4()
        self._resolved_config = ConfigResolver.resolve_config(self.config)

        self._backend = BackendFactory.from_config(config=self._resolved_config.backend)
        self._connector = ConnectorFactory.from_config(
            config=self._resolved_config.connector
        )
        self._operation_executor = OperationExecutor(
            connector=self.connector,
        )

    @property
    def backend(self) -> BackendOrchestrator:
        """Direct access to the backend for read-only inspection.

        Warning: mutating backend state directly bypasses Saga guarantees.
        Use UoW methods for all write operations.
        """
        return self._backend

    @property
    def connector(self) -> AbstractConnector:
        """Direct access to the connector.

        Warning: operations performed directly on the connector
        are not tracked by the backend.
        Therefore, they will not be compensated by uow.
        """
        return self._connector

    async def __aenter__(
        self,
    ):
        try:
            await asyncio.gather(
                self.connector.open(),
                self.backend.open(),
            )

            await ReconcileOrchestrator().reconcile_if_needed(uow=self)

            return self

        except Exception:
            await self._cleanup()
            raise

    async def __aexit__(
        self,
        exc_type,
        exc,
        tb,
    ):
        try:
            if exc_type is not None or not self._auto_commit:
                await self.rollback()
            elif self._auto_commit:
                await self.commit()
        finally:
            await self._cleanup()

    async def _finalize_operation(self, operation: OperationRecord) -> None:
        try:
            await self._operation_executor.finalize(operation)
        except Exception:
            logger.warning(
                "Finalization failed.",
                extra={
                    "operation_id": operation.record.operation_id,
                    "session_id": operation.record.session_id,
                },
                exc_info=True,
            )

    async def _finalize_operations(self, operations: Iterable[OperationRecord]) -> None:
        await asyncio.gather(
            *(self._finalize_operation(op) for op in operations),
            return_exceptions=True,
        )

    async def commit(
        self,
    ) -> None:
        operations = self.backend.session_buffer.get_all()

        if operations:
            for operation in operations:
                operation.record.mark_done()

            with suppress(Exception):
                await self._finalize_operations(operations)
        await self.backend.commit()

    async def rollback(
        self,
    ) -> None:
        operations = self.backend.session_buffer.get_all()
        finalize_operations = []
        for operation in reversed(tuple(operations)):
            try:
                await self._operation_executor.compensate(operation)
            except Exception as e:
                logger.exception(
                    "Compensation failed.",
                    extra={
                        "operation_id": operation.record.operation_id,
                        "session_id": operation.record.session_id,
                    },
                )
                operation.record.mark_compensation_failed(error=str(e))

            else:
                finalize_operations.append(operation)

        with suppress(Exception):
            await self._finalize_operations(finalize_operations)
        await self.backend.commit()

    async def _cleanup(self) -> None:
        await asyncio.gather(
            self.connector.close(),
            self.backend.close(),
            return_exceptions=True,
        )

backend property

Direct access to the backend for read-only inspection.

Warning: mutating backend state directly bypasses Saga guarantees. Use UoW methods for all write operations.

connector property

Direct access to the connector.

Warning: operations performed directly on the connector are not tracked by the backend. Therefore, they will not be compensated by uow.

UowInspector dataclass

Extracts info from Unit of Work.

Source code in src/fennflow/uow/inspector.py
14
15
16
17
18
19
20
21
22
23
@dataclass(slots=True, frozen=True)
class UowInspector:
    """Extracts info from Unit of Work."""

    uow: UnitOfWork

    def get_repo_fields(self) -> Generator[RepoField, None, None]:
        for field in vars(type(self.uow)).values():
            if isinstance(field, RepoField):
                yield field

ConfigDict

Bases: TypedDict

Configuration for a UnitOfWork instance.

All fields are optional — if not provided, defaults are used.

Attributes:

Name Type Description
backend BackendConfig

Configuration for the metadata backend (e.g. InMemoryBackendConfig).

connector ConnectorConfig

Configuration for the storage connector (e.g. S3ConnectorConfig).

Example

class UOW(UnitOfWork): config = ConfigDict( backend=InMemoryBackendConfig(), connector=S3ConnectorConfig(...), )

Source code in src/fennflow/core/configs/base.py
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
class ConfigDict(TypedDict, total=False):
    """Configuration for a UnitOfWork instance.

    All fields are optional — if not provided, defaults are used.

    Attributes:
        backend: Configuration for the metadata backend
            (e.g. ``InMemoryBackendConfig``).
        connector: Configuration for the storage connector (e.g. ``S3ConnectorConfig``).

    Example:
        class UOW(UnitOfWork):
            config = ConfigDict(
                backend=InMemoryBackendConfig(),
                connector=S3ConnectorConfig(...),
            )
    """

    backend: BackendConfig
    connector: ConnectorConfig
    reconcile: ReconcileConfig

BackendFactory

Factory for creating backends from config.

Source code in src/fennflow/backends/_factory.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
class BackendFactory:
    """Factory for creating backends from config."""

    @staticmethod
    def from_config(config: BackendConfig) -> BackendOrchestrator:

        specific_backend_factory = backend_registry.get(config.__class__.__name__)
        if not specific_backend_factory:
            raise KeyError(f"Unknown backend for : {type(config)=}")

        backend = specific_backend_factory.from_config(
            config,
        )

        return BackendOrchestrator(
            backend_engine=backend,
            session_buffer=InMemorySessionBuffer(),
        )

InMemoryBackendConfig

Bases: AbstractBackendConfig

Configuration for the in-memory backend.

No configuration is required — the in-memory backend is zero-dependency.

Source code in src/fennflow/backends/in_memory/config.py
4
5
6
7
8
9
class InMemoryBackendConfig(AbstractBackendConfig):
    """Configuration for the in-memory backend.

    No configuration is required — the in-memory backend
    is zero-dependency.
    """

ConnectorFactory

Factory for creating connector instances from config objects.

Resolves the appropriate connector class from connector_registry based on the config class name.

Example::

connector = ConnectorFactory.from_config(S3ConnectorConfig(...))
Source code in src/fennflow/connectors/_factory.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
class ConnectorFactory:
    """Factory for creating connector instances from config objects.

    Resolves the appropriate connector class from ``connector_registry``
    based on the config class name.

    Example::

        connector = ConnectorFactory.from_config(S3ConnectorConfig(...))
    """

    @staticmethod
    def from_config(config: ConnectorConfig) -> AbstractConnector:
        """Create a connector instance from a config object.

        Args:
            config: The connector configuration instance.

        Returns:
            An initialized connector instance.

        Raises:
            ValueError: If no connector is registered for the config type.
        """
        connector_cls = connector_registry.get(config.__class__.__name__)
        if not connector_cls:
            raise KeyError(f"Unknown connector for : {type(config)=}")

        return connector_cls(config=config)

from_config(config) staticmethod

Create a connector instance from a config object.

Parameters:

Name Type Description Default
config ConnectorConfig

The connector configuration instance.

required

Returns:

Type Description
AbstractConnector

An initialized connector instance.

Raises:

Type Description
ValueError

If no connector is registered for the config type.

Source code in src/fennflow/connectors/_factory.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
@staticmethod
def from_config(config: ConnectorConfig) -> AbstractConnector:
    """Create a connector instance from a config object.

    Args:
        config: The connector configuration instance.

    Returns:
        An initialized connector instance.

    Raises:
        ValueError: If no connector is registered for the config type.
    """
    connector_cls = connector_registry.get(config.__class__.__name__)
    if not connector_cls:
        raise KeyError(f"Unknown connector for : {type(config)=}")

    return connector_cls(config=config)

InMemoryConnector

Bases: AbstractConnector

In-memory connector for file storage, primarily used for testing.

Stores files in a class-level dictionary shared across all instances,

Use drop_all() between tests to reset state.

Example::

class UOW(UnitOfWork):
    config = ConfigDict(
        connector=InMemoryConnectorConfig(),
    )
Source code in src/fennflow/connectors/in_memory/core.py
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
class InMemoryConnector(AbstractConnector):
    """In-memory connector for file storage, primarily used for testing.

    Stores files in a class-level dictionary shared across all instances,

    Use ``drop_all()`` between tests to reset state.

    Example::

        class UOW(UnitOfWork):
            config = ConfigDict(
                connector=InMemoryConnectorConfig(),
            )
    """

    _storage: defaultdict[Namespace, dict[StoragePath, BinaryMedia]] | None = None

    async def open(self) -> Self:
        return self

    async def close(self):
        pass

    def __init__(self, config: InMemoryConnectorConfig):
        self._config = config

        if self.__class__._storage is None:
            self.__class__._storage = defaultdict(dict)

    @property
    def storage(
        self,
    ) -> defaultdict[Namespace, dict[StoragePath, BinaryMedia]]:
        if self.__class__._storage is None:
            raise RuntimeError(
                "Cannot get in-memory storage. InMemoryConnector is not initialized.",
            )
        return self.__class__._storage

    async def put(
        self,
        file: BinaryMedia,
        repo_extra: RepoExtra,
        **extra,  # noqa: ARG002
    ) -> None:
        namespace = repo_extra["namespace"]
        self.storage[namespace][file.storage_path] = file
        logger.debug(f"{file=} uploaded to {namespace=}")

    async def get(
        self,
        storage_path: StoragePath,
        repo_extra: RepoExtra,
        **extra: dict[Any, Any],  # noqa: ARG002
    ) -> MediaResponse:

        if storage_path not in self.storage[repo_extra["namespace"]]:
            return MediaResponse()

        file = self.storage[repo_extra["namespace"]][storage_path]

        return MediaResponse(
            media=(
                ContentFactory.from_bytes(
                    media_type=file.media_type,
                    data=file.data,
                    **file.get_metadata(),
                ),
            )
        )

    async def delete(
        self,
        storage_path: StoragePath,
        repo_extra: RepoExtra,
        **extra: dict[Any, Any],  # noqa: ARG002
    ):
        self.storage[repo_extra["namespace"]].pop(storage_path, None)

    @reraise_with(NoSuchKeyException(), catch=KeyError)
    async def copy_object(
        self,
        repo_extra: RepoExtra,
        from_storage_path: StoragePath,
        to_storage_path: StoragePath,
        to_namespace: Namespace,
        **extra: dict[Any, Any],  # noqa: ARG002
    ):
        file = self.storage[repo_extra["namespace"]][from_storage_path]
        self.storage[to_namespace][to_storage_path] = file

    @classmethod
    def drop_all(cls) -> None:
        cls._storage = defaultdict(dict)

    async def list_objects(
        self,
        prefix: str,
        repo_extra: RepoExtraType,
        limit: int = 1000,
        continuation_token: Omittable[str] | None = OMIT,
        **extra: dict[Any, Any],  # noqa: ARG002
    ) -> ListResponse:
        filtered_storage_paths = []
        all_storage_paths = sorted(self.storage[repo_extra["namespace"]])

        if continuation_token:
            index = bisect.bisect_right(all_storage_paths, continuation_token)
        else:
            index = 0

        for storage_path in islice(all_storage_paths, index, None):
            if 0 >= limit:
                continuation_token = storage_path
                break

            if storage_path.startswith(prefix):
                filtered_storage_paths.append(storage_path)
                limit -= 1
        else:
            continuation_token = None

        return ListResponse(
            storage_paths=filtered_storage_paths,
            continuation_token=continuation_token,
        )

InMemoryConnectorConfig

Bases: AbstractConnectorConfig

Configuration for the in-memory connector.

No configuration is required — the in-memory connector is zero-dependency and is intended for testing and development only.

Source code in src/fennflow/connectors/in_memory/config.py
4
5
6
7
8
9
class InMemoryConnectorConfig(AbstractConnectorConfig):
    """Configuration for the in-memory connector.

    No configuration is required — the in-memory connector
    is zero-dependency and is intended for testing and development only.
    """

S3Connector

Bases: AbstractConnector[S3Extra]

Connector for AWS S3-compatible object storage via aiobotocore.

Use S3ConnectorConfig to configure credentials, region, etc.

Example::

class UOW(UnitOfWork):
    config = ConfigDict(
        connector=S3ConnectorConfig(...),
    )
Source code in src/fennflow/connectors/s3/core.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
class S3Connector(AbstractConnector[S3Extra]):
    """Connector for AWS S3-compatible object storage via aiobotocore.

    Use ``S3ConnectorConfig`` to configure credentials, region, etc.

    Example::

        class UOW(UnitOfWork):
            config = ConfigDict(
                connector=S3ConnectorConfig(...),
            )
    """

    _aio_session: AioSession | None = None

    def __init__(
        self,
        config: S3ConnectorConfig,
    ):
        self._config = config
        self._client: S3Client | None = None

    @property
    def aio_session(self) -> AioSession:
        if self.__class__._aio_session is None:
            raise RuntimeError("AioSession is not initialized.")

        return self.__class__._aio_session

    async def open(
        self,
    ) -> Self:
        if self.__class__._aio_session is None:
            self.__class__._aio_session = get_session()

        self._client = await S3Client(
            config=self._config,
            session=self.aio_session,
        ).open()
        return self

    async def close(
        self,
    ):
        if self._client:
            await self._client.close()
            self._client = None

    @property
    def s3client(
        self,
    ) -> S3Client:
        if self._client is None:
            raise RuntimeError("S3Connector client is not initialized.")
        return self._client

    async def put(
        self,
        file: BinaryMedia,
        repo_extra: S3Extra,
        **sdk_extra: Any,
    ) -> None:
        bucket_name = repo_extra["namespace"]
        await self.s3client.client.put_object(
            Bucket=bucket_name,
            Key=file.storage_path,
            Body=file.data,
            ContentType=file.media_type,
            Metadata=file.get_metadata(),
            **sdk_extra,
        )
        logger.debug(f"{file=} uploaded to {bucket_name=}")

    async def get(
        self,
        storage_path: StoragePath,
        repo_extra: S3Extra,
        **sdk_extra: Any,
    ) -> MediaResponse:
        response = await self.s3client.client.get_object(
            Bucket=repo_extra["namespace"],
            Key=storage_path,
            **sdk_extra,
        )
        if not response:
            return MediaResponse()

        return MediaResponse(
            media=(
                ContentFactory.from_bytes(
                    media_type=response["ContentType"],
                    data=await response["Body"].read(),
                    **response.get("Metadata", {}),
                ),
            ),
        )

    async def delete(
        self,
        storage_path: StoragePath,
        repo_extra: S3Extra,
        **sdk_extra: Any,
    ):
        bucket_name = repo_extra["namespace"]
        await self.s3client.client.delete_object(
            Bucket=bucket_name,
            Key=storage_path,
            **sdk_extra,
        )
        logger.debug(f"file with {storage_path=} deleted from {bucket_name=}")

    @reraise_with(
        NoSuchKeyException(),
        catch=lambda e: (
            isinstance(e, ClientError) and e.response["Error"]["Code"] == "NoSuchKey"
        ),
    )
    async def copy_object(
        self,
        repo_extra: S3Extra,
        from_storage_path: StoragePath,
        to_storage_path: StoragePath,
        to_namespace: Namespace,
        **sdk_extra: Any,
    ):
        bucket_name = repo_extra["namespace"]

        await self.s3client.client.copy_object(
            CopySource={"Bucket": bucket_name, "Key": from_storage_path},
            Bucket=to_namespace,
            Key=to_storage_path,
            **sdk_extra,
        )
        logger.debug(
            f"file from {bucket_name=} with {from_storage_path=} "
            f"copied to {to_namespace=}"
        )

    async def list_objects(
        self,
        prefix: str,
        repo_extra: RepoExtraType,
        limit: int = 1000,
        continuation_token: Omittable[str] | None = OMIT,
        **extra: Any,
    ) -> ListResponse:

        if continuation_token:
            extra["ContinuationToken"] = continuation_token

        response = await self.s3client.client.list_objects_v2(
            Bucket=repo_extra["namespace"],
            Prefix=prefix,
            MaxKeys=limit,
            **extra,
        )

        return ListResponse(
            storage_paths=tuple(obj["Key"] for obj in response.get("Contents", [])),
            continuation_token=response.get("NextContinuationToken"),
        )

S3ConnectorConfig

Bases: AbstractConnectorConfig

Configuration for the S3 connector.

Credentials can be provided explicitly via this config or through any method supported by the AWS credential chain (environment variables, ~/.aws/credentials, IAM roles, etc.). See the AWS documentation <https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html>_ for the full list of supported options.

Attributes:

Name Type Description
aws_access_key_id str | None

AWS access key ID.

aws_secret_access_key str | None

AWS secret access key.

endpoint_url str | None

Custom endpoint URL for S3-compatible storage.

aiobotocore_config AioConfig | None

Advanced aiobotocore client configuration.

Example::

# explicit credentials
class UOW(UnitOfWork):
    user_files = UserFiles
    config = ConfigDict(
                connector=S3ConnectorConfig(
                        aws_access_key_id="key",
                        aws_secret_access_key="secret",
                        )
                        )


# rely on AWS credential chain
S3ConnectorConfig()
Source code in src/fennflow/connectors/s3/config.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
class S3ConnectorConfig(AbstractConnectorConfig):
    """Configuration for the S3 connector.

    Credentials can be provided explicitly via this config or through any method
    supported by the AWS credential chain
    (environment variables, ``~/.aws/credentials``, IAM roles, etc.).
    See the `AWS documentation
    <https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-envvars.html>`_
    for the full list of supported options.

    Attributes:
        aws_access_key_id: AWS access key ID.
        aws_secret_access_key: AWS secret access key.
        endpoint_url: Custom endpoint URL for S3-compatible storage.
        aiobotocore_config: Advanced aiobotocore client configuration.

    Example::

        # explicit credentials
        class UOW(UnitOfWork):
            user_files = UserFiles
            config = ConfigDict(
                        connector=S3ConnectorConfig(
                                aws_access_key_id="key",
                                aws_secret_access_key="secret",
                                )
                                )


        # rely on AWS credential chain
        S3ConnectorConfig()
    """

    model_config = AbstractConnectorConfig.model_config | ConfigDict(
        arbitrary_types_allowed=True
    )
    aws_access_key_id: str | None = None
    aws_secret_access_key: str | None = None
    endpoint_url: str | None = None
    aiobotocore_config: AioConfig | None = None

CreateRepository

Bases: AtRepository, ValidateDuplicatesMixin

Repository for uploading (creating) files in the storage.

This repository implements the "create" operation, which uploads new files to the configured storage (e.g. S3) within the current Unit of Work.

Example::

file1 = TextContent.from_content("This is the first file.")
await uow.user_files.at("user1/").create(file1)

Behavior:

  • Each file is registered in the backend as a pending operation
  • Files are uploaded via the connector
  • Backend commit is executed on uow.commit

Raises: RecordAlreadyExistsException: If a file with the same path already exists in a backend FilepathsCollisionError: If files with the same filepath are passed

Source code in src/fennflow/repositories/create.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
class CreateRepository(
    AtRepository,
    ValidateDuplicatesMixin,
):
    """Repository for uploading (creating) files in the storage.

    This repository implements the "create" operation, which uploads new files
    to the configured storage (e.g. S3) within the current Unit of Work.

    **Example**::

        file1 = TextContent.from_content("This is the first file.")
        await uow.user_files.at("user1/").create(file1)

    **Behavior**:

    - Each file is registered in the backend as a pending operation
    - Files are uploaded via the connector
    - Backend commit is executed on uow.commit

    **Raises**:
        RecordAlreadyExistsException:
            If a file with the same path already exists in a backend
        FilepathsCollisionError:
            If files with the same filepath are passed

    """

    async def create(
        self,
        *files: BinaryMedia,
        **provider_extra,
    ) -> None:
        self.validate_duplicates_from_files(files)
        tasks = []
        operations = []
        for file in files:
            file._storage_prefix = self.cwd

            record = await self._uow._backend.backend_engine.execute(
                GetVisibleQuerySpec(
                    scope=self._uow._resolved_config.backend.scope,
                    namespace=self.repo_extra["namespace"],
                    storage_path=file.storage_path,
                    session_id=self._uow._session_id,
                )
            )

            if record:
                raise RecordAlreadyExistsException(
                    storage_path=record.storage_path,
                )

            operation = OperationRecord.from_uow(
                uow=self._uow,
                operation_type=OperationTypeEnum.CREATE,
                storage_path=file.storage_path,
                context=CreateContext(file=file),
                repo_extra=self.repo_extra,
            )

            await self._uow.backend.insert(
                operation,
                on_conflict=OnConflictDoEnum.REPLACE,
            )
            tasks.append(
                self._uow._operation_executor.execute(
                    operation,
                    **provider_extra,
                ),
            )
            operations.append(operation)

        await self._uow.backend.flush(operations=operations)
        await asyncio.gather(*tasks)

DeleteRepository

Bases: AtRepository

Repository mixin for deleting files from storage.

Implements Saga-based deletion with automatic compensation on failure.

Source code in src/fennflow/repositories/delete.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class DeleteRepository(AtRepository):
    """Repository mixin for deleting files from storage.

    Implements Saga-based deletion with automatic compensation on failure.
    """

    async def delete(self, path: str, **provider_extra: Any) -> bool:
        """Delete a file from storage.

        Args:
            path: Path to the file relative to the current directory.
            **provider_extra: Additional kwargs forwarded to the connector.

        Returns:
            True if the file was deleted, False if it did not exist.

        """
        storage_path = self._join_path(path)
        record = await self._uow._backend.backend_engine.execute(
            GetVisibleQuerySpec(
                scope=self._uow._resolved_config.backend.scope,
                namespace=self.repo_extra["namespace"],
                storage_path=storage_path,
                session_id=self._uow._session_id,
            )
        )
        if record is None:
            return False

        operation = OperationRecord.from_uow(
            uow=self._uow,
            operation_type=OperationTypeEnum.DELETE,
            storage_path=record.storage_path,
            context=self.__get_context(record=record),
            repo_extra=self.repo_extra,
        )
        await self._uow.backend.insert(
            operation,
            on_conflict=OnConflictDoEnum.REPLACE,
        )

        await self._uow._operation_executor.execute(
            operation,
            **provider_extra,
        )
        await self._uow.backend.flush(operations=[operation])
        return True

    def __get_context(self, record: Record) -> DeleteContext:
        return DeleteContext(
            to_storage_path=record.generate_tmp_path(),
            to_namespace=self.repo_extra["namespace"],
        )

delete(path, **provider_extra) async

Delete a file from storage.

Parameters:

Name Type Description Default
path str

Path to the file relative to the current directory.

required
**provider_extra Any

Additional kwargs forwarded to the connector.

{}

Returns:

Type Description
bool

True if the file was deleted, False if it did not exist.

Source code in src/fennflow/repositories/delete.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
async def delete(self, path: str, **provider_extra: Any) -> bool:
    """Delete a file from storage.

    Args:
        path: Path to the file relative to the current directory.
        **provider_extra: Additional kwargs forwarded to the connector.

    Returns:
        True if the file was deleted, False if it did not exist.

    """
    storage_path = self._join_path(path)
    record = await self._uow._backend.backend_engine.execute(
        GetVisibleQuerySpec(
            scope=self._uow._resolved_config.backend.scope,
            namespace=self.repo_extra["namespace"],
            storage_path=storage_path,
            session_id=self._uow._session_id,
        )
    )
    if record is None:
        return False

    operation = OperationRecord.from_uow(
        uow=self._uow,
        operation_type=OperationTypeEnum.DELETE,
        storage_path=record.storage_path,
        context=self.__get_context(record=record),
        repo_extra=self.repo_extra,
    )
    await self._uow.backend.insert(
        operation,
        on_conflict=OnConflictDoEnum.REPLACE,
    )

    await self._uow._operation_executor.execute(
        operation,
        **provider_extra,
    )
    await self._uow.backend.flush(operations=[operation])
    return True

GetRepository

Bases: AtRepository

Repository for retrieving a file from storage within the current scope.

This method returns a MediaResponse object containing the requested file, if it exists according to the backend (source of truth).

Example::

response = await uow.user_files.at("user1/").get("file.txt")
if response:
    file = response[0]

Behavior:

  • The backend is treated as the source of truth
  • If the file is not present in the backend, the storage is NOT queried
  • If the file exists in the backend, it is fetched from the storage via the connector

Notes:

  • This method is read-only and does not participate in transaction flows (no saga)
  • No network request is made if the backend does not contain the file
  • Storage and backend may become inconsistent (e.g. after restart with InMemoryBackend); in such cases, use a reconcile mechanism to resync state. ()
Source code in src/fennflow/repositories/get.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class GetRepository(AtRepository):
    """Repository for retrieving a file from storage within the current scope.

    This method returns a `MediaResponse` object containing the requested file,
    if it exists according to the backend (source of truth).

    **Example**::

        response = await uow.user_files.at("user1/").get("file.txt")
        if response:
            file = response[0]

    **Behavior**:

    - The backend is treated as the source of truth
    - If the file is not present in the backend, the storage is NOT queried
    - If the file exists in the backend,
    it is fetched from the storage via the connector

    **Notes**:

    - This method is read-only and does not participate in transaction flows (no saga)
    - No network request is made if the backend does not contain the file
    - Storage and backend may become inconsistent
      (e.g. after restart with InMemoryBackend);
      in such cases, use a reconcile mechanism to resync state.
      (<add_link_here on wiki>)

    """

    async def get(self, *paths: str, **provider_extra) -> MediaResponse:
        """Retrieve a file from storage within the current scope.

        Args:
            *paths (str):
                Relative file's paths within the scoped repository

            **provider_extra (Any):
                Additional provider-specific parameters passed directly to the connector
                (e.g. S3 `get_object` arguments)

        Returns:
            MediaResponse:
                - `MediaResponse(media=(...))` if the file exists
                - `MediaResponse()` (empty) if the file does not exist
        """
        tasks = []
        for path in paths:
            storage_path = self._join_path(path)
            operation = await self._uow._backend.backend_engine.execute(
                GetVisibleQuerySpec(
                    scope=self._uow._resolved_config.backend.scope,
                    namespace=self.repo_extra["namespace"],
                    storage_path=storage_path,
                    session_id=self._uow._session_id,
                )
            )

            if operation:
                tasks.append(
                    self._uow.connector.get(
                        storage_path=storage_path,
                        repo_extra=self.repo_extra,
                        **provider_extra,
                    )
                )

        if tasks:
            results = await asyncio.gather(*tasks)
            return MediaResponse.join(results)

        return MediaResponse()

get(*paths, **provider_extra) async

Retrieve a file from storage within the current scope.

Parameters:

Name Type Description Default
*paths str

Relative file's paths within the scoped repository

()
**provider_extra Any

Additional provider-specific parameters passed directly to the connector (e.g. S3 get_object arguments)

{}

Returns:

Name Type Description
MediaResponse MediaResponse
  • MediaResponse(media=(...)) if the file exists
  • MediaResponse() (empty) if the file does not exist
Source code in src/fennflow/repositories/get.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
async def get(self, *paths: str, **provider_extra) -> MediaResponse:
    """Retrieve a file from storage within the current scope.

    Args:
        *paths (str):
            Relative file's paths within the scoped repository

        **provider_extra (Any):
            Additional provider-specific parameters passed directly to the connector
            (e.g. S3 `get_object` arguments)

    Returns:
        MediaResponse:
            - `MediaResponse(media=(...))` if the file exists
            - `MediaResponse()` (empty) if the file does not exist
    """
    tasks = []
    for path in paths:
        storage_path = self._join_path(path)
        operation = await self._uow._backend.backend_engine.execute(
            GetVisibleQuerySpec(
                scope=self._uow._resolved_config.backend.scope,
                namespace=self.repo_extra["namespace"],
                storage_path=storage_path,
                session_id=self._uow._session_id,
            )
        )

        if operation:
            tasks.append(
                self._uow.connector.get(
                    storage_path=storage_path,
                    repo_extra=self.repo_extra,
                    **provider_extra,
                )
            )

    if tasks:
        results = await asyncio.gather(*tasks)
        return MediaResponse.join(results)

    return MediaResponse()

ListRepository

Bases: AtRepository

Repository for retrieving a files from storage within the current scope.

Source code in src/fennflow/repositories/list.py
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class ListRepository(AtRepository):
    """Repository for retrieving a files from storage within the current scope."""

    async def list(
        self,
        prefix: str = "",
        continuation_token: Omittable[str] = OMIT,
        limit: int = 1000,
    ) -> ListResponse:
        """Uploads files under the current path, optionally filtered by prefix.

        Files are visible if they are uploaded (committed) or pending within the
        current session. Pending files from other sessions are not returned.

        Args:
            prefix: Sub-path to filter results. Appended to the current ``at()``
                path. Defaults to ``""`` (list everything under the current path).
            continuation_token: Opaque token returned by a previous call to
                continue paginating.
            limit: Maximum number of storage_paths to return. Defaults to ``1000``.

        Returns:
            ListResponse: A container of storage_paths matching the query. Includes
                a ``continuation_token`` if more results are available, otherwise
                ``None``.

        Example::

             async with UOW() as uow:
                 await uow.files.at("folder1/").put(file1, file2, file3)
                 page = await uow.files.at("folder1/").list(limit=2)

                 next_page = await uow.files.at("folder1/").list(
                     limit=2,
                     continuation_token=page.continuation_token,
                 )
        """
        storage_prefix = self._join_path(prefix)

        record_page = await self._uow.backend.backend_engine.execute(
            SelectVisibleQuerySpec(
                scope=self._uow._resolved_config.backend.scope,
                namespace=self.repo_extra["namespace"],
                prefix=storage_prefix,
                continuation_token=continuation_token,
                limit=limit,
                session_id=self._uow._session_id,
            )
        )

        return ListResponse(
            storage_paths=tuple(record.storage_path for record in record_page),
            continuation_token=record_page.continuation_token,
        )

list(prefix='', continuation_token=OMIT, limit=1000) async

Uploads files under the current path, optionally filtered by prefix.

Files are visible if they are uploaded (committed) or pending within the current session. Pending files from other sessions are not returned.

Parameters:

Name Type Description Default
prefix str

Sub-path to filter results. Appended to the current at() path. Defaults to "" (list everything under the current path).

''
continuation_token Omittable[str]

Opaque token returned by a previous call to continue paginating.

OMIT
limit int

Maximum number of storage_paths to return. Defaults to 1000.

1000

Returns:

Name Type Description
ListResponse ListResponse

A container of storage_paths matching the query. Includes a continuation_token if more results are available, otherwise None.

Example::

 async with UOW() as uow:
     await uow.files.at("folder1/").put(file1, file2, file3)
     page = await uow.files.at("folder1/").list(limit=2)

     next_page = await uow.files.at("folder1/").list(
         limit=2,
         continuation_token=page.continuation_token,
     )
Source code in src/fennflow/repositories/list.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
async def list(
    self,
    prefix: str = "",
    continuation_token: Omittable[str] = OMIT,
    limit: int = 1000,
) -> ListResponse:
    """Uploads files under the current path, optionally filtered by prefix.

    Files are visible if they are uploaded (committed) or pending within the
    current session. Pending files from other sessions are not returned.

    Args:
        prefix: Sub-path to filter results. Appended to the current ``at()``
            path. Defaults to ``""`` (list everything under the current path).
        continuation_token: Opaque token returned by a previous call to
            continue paginating.
        limit: Maximum number of storage_paths to return. Defaults to ``1000``.

    Returns:
        ListResponse: A container of storage_paths matching the query. Includes
            a ``continuation_token`` if more results are available, otherwise
            ``None``.

    Example::

         async with UOW() as uow:
             await uow.files.at("folder1/").put(file1, file2, file3)
             page = await uow.files.at("folder1/").list(limit=2)

             next_page = await uow.files.at("folder1/").list(
                 limit=2,
                 continuation_token=page.continuation_token,
             )
    """
    storage_prefix = self._join_path(prefix)

    record_page = await self._uow.backend.backend_engine.execute(
        SelectVisibleQuerySpec(
            scope=self._uow._resolved_config.backend.scope,
            namespace=self.repo_extra["namespace"],
            prefix=storage_prefix,
            continuation_token=continuation_token,
            limit=limit,
            session_id=self._uow._session_id,
        )
    )

    return ListResponse(
        storage_paths=tuple(record.storage_path for record in record_page),
        continuation_token=record_page.continuation_token,
    )

PutRepository

Bases: AtRepository, ValidateDuplicatesMixin

Repository for upserting files in the storage.

This repository implements the "put" operation, which uploads new files to the configured storage (e.g. S3) within the current Unit of Work.

Example::

file1 = TextContent.from_content("This is the first file.")
await uow.user_files.at("user1/").put(file1)

Behavior:

  • Each file is registered in the backend as a pending operation
  • Files are uploaded via the connector
  • Backend commit is executed on uow.commit

Raises: FilepathsCollisionError: If files with the same filepath are passed

Source code in src/fennflow/repositories/put.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class PutRepository(AtRepository, ValidateDuplicatesMixin):
    """Repository for upserting files in the storage.

    This repository implements the "put" operation, which uploads new files
    to the configured storage (e.g. S3) within the current Unit of Work.

    **Example**::

        file1 = TextContent.from_content("This is the first file.")
        await uow.user_files.at("user1/").put(file1)

    **Behavior**:

    - Each file is registered in the backend as a pending operation
    - Files are uploaded via the connector
    - Backend commit is executed on uow.commit

    **Raises**:
        FilepathsCollisionError:
            If files with the same filepath are passed

    """

    async def put(
        self,
        *files: BinaryMedia,
        **provider_extra,
    ) -> None:
        self.validate_duplicates_from_files(files)
        tasks = []
        operations = []
        for file in files:
            file._storage_prefix = self.cwd

            operation = await self._uow.backend.get(
                scope=self._uow._resolved_config.backend.scope,
                namespace=self.repo_extra["namespace"],
                storage_path=file.storage_path,
            )

            operation = OperationRecord.from_uow(
                uow=self._uow,
                operation_type=OperationTypeEnum.PUT,
                storage_path=file.storage_path,
                context=self.__get_context(operation, file),
                repo_extra=self.repo_extra,
            )

            await self._uow.backend.insert(
                operation,
                on_conflict=OnConflictDoEnum.REPLACE,
            )
            tasks.append(
                self._uow._operation_executor.execute(
                    operation,
                    **provider_extra,
                ),
            )
            operations.append(operation)
        await self._uow.backend.flush(operations=operations)
        await asyncio.gather(*tasks)

    def __get_context(
        self,
        operation: OperationRecord | None,
        file: BinaryMedia,
    ) -> PutContext:
        if (
            operation
            and operation.record.is_pending
            and operation.record.session_id == self._uow._session_id
            and operation.record.is_put_type
        ):
            tmp_path = operation.context.tmp_path
            ctx = PutContext(
                file=file,
                tmp_path=tmp_path,
            )
        else:
            ctx = PutContext(
                file=file,
            )

        return ctx

RepoField

Bases: Generic[RepoType]

A descriptor that lazily initializes a repository instance on a UnitOfWork.

Parameters:

Name Type Description Default
repo_cls type[RepoType]

The repository class to instantiate.

required
namespace Namespace

The storage namespace (e.g. S3 bucket name) for this repository.

required
Example

class UOW(UnitOfWork): user_files = RepoField(UserFiles, namespace="user-files")

Source code in src/fennflow/repositories/fields/base.py
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
class RepoField(Generic[RepoType]):
    """A descriptor that lazily initializes a repository instance on a UnitOfWork.

    Args:
        repo_cls: The repository class to instantiate.
        namespace: The storage namespace (e.g. S3 bucket name) for this repository.

    Example:
        class UOW(UnitOfWork):
            user_files = RepoField(UserFiles, namespace="user-files")

    """

    def __init__(
        self,
        repo_cls: type[RepoType],
        *,
        namespace: Namespace,
    ):
        self.repo_cls = repo_cls
        self.repo_extra: RepoExtra = {"namespace": namespace}

    def __set_name__(
        self,
        owner,
        name,
    ):
        self.name = name

    @overload
    def __get__(
        self,
        instance: None,
        owner,
    ) -> RepoField[RepoType]: ...

    @overload
    def __get__(
        self,
        instance,
        owner,
    ) -> RepoType: ...

    def __get__(
        self,
        instance,
        owner,
    ):
        if instance is None:
            return self

        repo = instance.__dict__.get(self.name)
        if repo is None:
            repo = self.repo_cls(
                uow=instance,
                path="/",
                repo_extra=self.repo_extra,
            )
            instance.__dict__[self.name] = repo

        return repo

S3RepoField(repo_cls, bucket_name)

Create a RepoField configured for S3 storage.

Parameters:

Name Type Description Default
repo_cls type[RepoType]

The repository class to instantiate.

required
bucket_name BucketName

alias for RepoField.namespace.

required

Returns:

Type Description
RepoField[RepoType]

A configured RepoField bound to the given repository class.

Example::

class UOW(UnitOfWork):
    user_files = S3RepoField(UserFiles, bucket_name="my-bucket")
Source code in src/fennflow/repositories/fields/s3.py
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def S3RepoField(
    repo_cls: type[RepoType],
    bucket_name: BucketName,
) -> RepoField[RepoType]:
    """Create a RepoField configured for S3 storage.

    Args:
        repo_cls: The repository class to instantiate.
        bucket_name: alias for RepoField.namespace.

    Returns:
        A configured RepoField bound to the given repository class.

    **Example**::

        class UOW(UnitOfWork):
            user_files = S3RepoField(UserFiles, bucket_name="my-bucket")

    """
    return RepoField(
        repo_cls,
        namespace=bucket_name,
    )

AudioContent

Bases: BinaryContent

Media content representing an audio file.

Attributes:

Name Type Description
duration int | None

Duration of the audio in seconds, if known.

Source code in src/fennflow/files/media/audio_content.py
 4
 5
 6
 7
 8
 9
10
11
class AudioContent(BinaryContent):
    """Media content representing an audio file.

    Attributes:
        duration: Duration of the audio in seconds, if known.
    """

    duration: int | None = None

BinaryContent

Bases: BaseContent

Base class for binary content type.

Source code in src/fennflow/files/media/binary_content.py
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
class BinaryContent(BaseContent):
    """Base class for binary content type."""

    data: bytes = Field(repr=False)

    @property
    def data_size_mb(self) -> float:
        size_bytes = len(self.data)
        size_mb = round(size_bytes / (1024 * 1024), 2)
        return size_mb

    @field_serializer("data")
    def ser_data(self, v: bytes, _info):
        return base64.b64encode(v).decode("ascii")

    @field_validator("data", mode="before")
    @classmethod
    def de_data(cls, v):
        if isinstance(v, str):
            return base64.b64decode(v)
        return v

ContentFactory

Factory for creating media content instances from raw data.

Resolves the appropriate content class from the registry based on MIME type, falling back to BinaryContent for unknown types.

Example

content = ContentFactory.from_bytes("text/plain", b"Hello, World!") url = ContentFactory.from_url("https://example.com/file.txt")

Source code in src/fennflow/files/factory.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
class ContentFactory:
    """Factory for creating media content instances from raw data.

    Resolves the appropriate content class from the registry based on
    MIME type, falling back to ``BinaryContent`` for unknown types.

    Example:
        content = ContentFactory.from_bytes("text/plain", b"Hello, World!")
        url = ContentFactory.from_url("https://example.com/file.txt")
    """

    @staticmethod
    @cache
    def _get_prefixes() -> list[str]:
        """Return registry prefixes sorted by length descending for match resolution."""
        return sorted(
            (p for p in content_registry if p.endswith("/")),
            key=len,
            reverse=True,
        )

    @classmethod
    def from_bytes(
        cls,
        media_type: str,
        data: bytes,
        **kwargs: Any,
    ) -> BinaryMedia:
        """Create a media content instance from raw bytes.

        Resolves the content class from the registry by exact MIME type match,
        then by prefix match, falling back to ``BinaryContent`` if no match is found.

        Args:
            media_type: The MIME type of the content (e.g. ``"text/plain"``).
            data: The raw bytes to wrap.
            **kwargs: Additional fields passed to the content model.

        Returns:
            A media content instance appropriate for the given MIME type.

        Raises:
            TypeError: If ``data`` is not bytes.
            ValueError: If the resolved content class fails validation.
        """
        if not isinstance(data, bytes):
            raise TypeError(f"Factory expected bytes, got {type(data)=} instead.")

        payload = {
            "media_type": media_type,
            "data": data,
            **kwargs,
        }

        if media_type in content_registry:
            content_cls = content_registry[media_type]
        else:
            for prefix in cls._get_prefixes():
                if media_type.startswith(prefix):
                    content_cls = content_registry[prefix]
                    break
            else:
                content_cls = BinaryContent

        try:
            return content_cls.model_validate(payload)
        except ValidationError as exc:
            raise ValueError(
                f"Failed to validate {content_cls.__name__=} for {media_type=}"
            ) from exc

    @staticmethod
    def from_url(
        url: str,
        media_type: str = "application/octet-stream",
        **kwargs: Any,
    ) -> UrlContent:
        """Create a ``UrlContent`` instance from a URL string.

        Args:
            url: The URL string to wrap.
            media_type: The MIME type of the resource.
                Defaults to ``"application/octet-stream"``.
            **kwargs: Additional fields passed to the content model.

        Returns:
            A ``UrlContent`` instance wrapping the given URL.

        Raises:
            ValueError: If the URL fails validation.
        """
        payload = {
            "data": url,
            "media_type": media_type,
            **kwargs,
        }
        try:
            return UrlContent.model_validate(payload)
        except ValidationError as exc:
            raise ValueError(f"Failed to create UrlContent for {url=}") from exc

from_bytes(media_type, data, **kwargs) classmethod

Create a media content instance from raw bytes.

Resolves the content class from the registry by exact MIME type match, then by prefix match, falling back to BinaryContent if no match is found.

Parameters:

Name Type Description Default
media_type str

The MIME type of the content (e.g. "text/plain").

required
data bytes

The raw bytes to wrap.

required
**kwargs Any

Additional fields passed to the content model.

{}

Returns:

Type Description
BinaryMedia

A media content instance appropriate for the given MIME type.

Raises:

Type Description
TypeError

If data is not bytes.

ValueError

If the resolved content class fails validation.

Source code in src/fennflow/files/factory.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
@classmethod
def from_bytes(
    cls,
    media_type: str,
    data: bytes,
    **kwargs: Any,
) -> BinaryMedia:
    """Create a media content instance from raw bytes.

    Resolves the content class from the registry by exact MIME type match,
    then by prefix match, falling back to ``BinaryContent`` if no match is found.

    Args:
        media_type: The MIME type of the content (e.g. ``"text/plain"``).
        data: The raw bytes to wrap.
        **kwargs: Additional fields passed to the content model.

    Returns:
        A media content instance appropriate for the given MIME type.

    Raises:
        TypeError: If ``data`` is not bytes.
        ValueError: If the resolved content class fails validation.
    """
    if not isinstance(data, bytes):
        raise TypeError(f"Factory expected bytes, got {type(data)=} instead.")

    payload = {
        "media_type": media_type,
        "data": data,
        **kwargs,
    }

    if media_type in content_registry:
        content_cls = content_registry[media_type]
    else:
        for prefix in cls._get_prefixes():
            if media_type.startswith(prefix):
                content_cls = content_registry[prefix]
                break
        else:
            content_cls = BinaryContent

    try:
        return content_cls.model_validate(payload)
    except ValidationError as exc:
        raise ValueError(
            f"Failed to validate {content_cls.__name__=} for {media_type=}"
        ) from exc

from_url(url, media_type='application/octet-stream', **kwargs) staticmethod

Create a UrlContent instance from a URL string.

Parameters:

Name Type Description Default
url str

The URL string to wrap.

required
media_type str

The MIME type of the resource. Defaults to "application/octet-stream".

'application/octet-stream'
**kwargs Any

Additional fields passed to the content model.

{}

Returns:

Type Description
UrlContent

A UrlContent instance wrapping the given URL.

Raises:

Type Description
ValueError

If the URL fails validation.

Source code in src/fennflow/files/factory.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
@staticmethod
def from_url(
    url: str,
    media_type: str = "application/octet-stream",
    **kwargs: Any,
) -> UrlContent:
    """Create a ``UrlContent`` instance from a URL string.

    Args:
        url: The URL string to wrap.
        media_type: The MIME type of the resource.
            Defaults to ``"application/octet-stream"``.
        **kwargs: Additional fields passed to the content model.

    Returns:
        A ``UrlContent`` instance wrapping the given URL.

    Raises:
        ValueError: If the URL fails validation.
    """
    payload = {
        "data": url,
        "media_type": media_type,
        **kwargs,
    }
    try:
        return UrlContent.model_validate(payload)
    except ValidationError as exc:
        raise ValueError(f"Failed to create UrlContent for {url=}") from exc

DocumentContent

Bases: BinaryContent

Media content representing a document.

Source code in src/fennflow/files/media/document_content.py
4
5
class DocumentContent(BinaryContent):
    """Media content representing a document."""

ImageContent

Bases: BinaryContent

Media content representing an image file.

Attributes:

Name Type Description
height int | None

Height of the image in pixels, if known.

width int | None

Width of the image in pixels, if known.

Source code in src/fennflow/files/media/image_content.py
 4
 5
 6
 7
 8
 9
10
11
12
13
class ImageContent(BinaryContent):
    """Media content representing an image file.

    Attributes:
        height: Height of the image in pixels, if known.
        width: Width of the image in pixels, if known.
    """

    height: int | None = None
    width: int | None = None

JsonContent

Bases: BinaryContent, FromContentAbstract, ContentPropertyAbstract, Generic[Value]

Media content representing a JSON file.

Stores JSON data as UTF-8 encoded bytes internally. Use from_content() to create from a Python object.

Attributes:

Name Type Description
encoding str

The text encoding. Defaults to "utf-8".

Example::

file = JsonContent.from_content({"key": "value"})
print(file.content) # {"key": "value"}
await uow.user_files.at("user1/").put(file)
Source code in src/fennflow/files/media/json_content.py
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class JsonContent(
    BinaryContent,
    FromContentAbstract,
    ContentPropertyAbstract,
    Generic[Value],
):
    """Media content representing a JSON file.

    Stores JSON data as UTF-8 encoded bytes internally.
    Use ``from_content()`` to create from a Python object.

    Attributes:
        encoding: The text encoding. Defaults to ``"utf-8"``.

    Example::

        file = JsonContent.from_content({"key": "value"})
        print(file.content) # {"key": "value"}
        await uow.user_files.at("user1/").put(file)
    """

    encoding: str = "utf-8"

    @property
    def content(self) -> Value:
        text = self.data.decode(self.encoding)
        return json.loads(text)

    @classmethod
    def from_content(
        cls,
        data: Value,
        media_type: str = "application/json",
        encoding: str = "utf-8",
        filename: Omittable[str] = OMIT,
        ensure_ascii: bool = False,
        indent: int | str | None = None,
        **extra_json_dumps_kwargs,
    ) -> JsonContent[Value]:
        dumped_data = json.dumps(
            data,
            ensure_ascii=ensure_ascii,
            indent=indent,
            **extra_json_dumps_kwargs,
        )

        extra = {}
        if is_given(filename):
            extra["filename"] = filename

        return cls(
            data=dumped_data.encode(encoding),
            media_type=media_type,
            encoding=encoding,
            **extra,
        )

TextContent

Bases: BinaryContent, FromContentAbstract, ContentPropertyAbstract

Media content representing a plain text file.

Stores text as UTF-8 encoded bytes internally. Use from_content() to create from a string.

Attributes:

Name Type Description
encoding str

The text encoding. Defaults to "utf-8".

Example::

file = TextContent.from_content("Hello, World!")
print(file.content) # "Hello, World!"
await uow.user_files.at("user1/").put(file)
Source code in src/fennflow/files/media/text_content.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class TextContent(
    BinaryContent,
    FromContentAbstract,
    ContentPropertyAbstract,
):
    """Media content representing a plain text file.

    Stores text as UTF-8 encoded bytes internally.
    Use ``from_content()`` to create from a string.

    Attributes:
        encoding: The text encoding. Defaults to ``"utf-8"``.

    Example::

        file = TextContent.from_content("Hello, World!")
        print(file.content) # "Hello, World!"
        await uow.user_files.at("user1/").put(file)
    """

    encoding: str = "utf-8"

    @property
    def content(self) -> str:
        try:
            return self.data.decode(self.encoding)
        except UnicodeDecodeError as e:
            raise ValueError(f"Cannot extract text from {self=}") from e

    @classmethod
    def from_content(
        cls,
        data: str,
        media_type: str = "text/plain",
        encoding: str = "utf-8",
        filename: Omittable[str] = OMIT,
        **kwargs,  # noqa: ARG003
    ) -> Self:
        extra = {}

        if is_given(filename):
            extra["filename"] = filename

        return cls(
            data=data.encode(encoding),
            media_type=media_type,
            encoding=encoding,
            **extra,
        )

UrlContent

Bases: BaseContent

Media content representing a URL.

Attributes:

Name Type Description
data str

The URL string.

Source code in src/fennflow/files/media/url_content.py
 4
 5
 6
 7
 8
 9
10
11
class UrlContent(BaseContent):
    """Media content representing a URL.

    Attributes:
        data: The URL string.
    """

    data: str

VideoContent

Bases: BinaryContent

Media content representing a video file.

Attributes:

Name Type Description
duration int | None

Duration of the video in seconds.

height int | None

Height of the video in pixels.

width int | None

Width of the video in pixels.

Source code in src/fennflow/files/media/video_content.py
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
class VideoContent(BinaryContent):
    """Media content representing a video file.

    Attributes:
        duration: Duration of the video in seconds.
        height: Height of the video in pixels.
        width: Width of the video in pixels.
    """

    duration: int | None = None
    height: int | None = None
    width: int | None = None