|
10 | 10 | from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
|
11 | 11 | from ray.data._internal.execution.interfaces import TaskContext
|
12 | 12 | from ray.data.block import Block, BlockAccessor
|
13 |
| -from ray.data.datasource.datasink import Datasink |
| 13 | +from ray.data.datasource.datasink import Datasink, WriteResult |
14 | 14 | from ray.data.datasource.filename_provider import FilenameProvider
|
15 | 15 | from ray.types import ObjectRef
|
16 | 16 |
|
|
21 | 21 | _logger: logging.Logger = logging.getLogger(__name__)
|
22 | 22 |
|
23 | 23 |
|
24 |
| -class _BlockFileDatasink(Datasink): |
| 24 | +class _BlockFileDatasink(Datasink[str]): |
25 | 25 | def __init__(
|
26 | 26 | self,
|
27 | 27 | path: str,
|
@@ -91,12 +91,12 @@ def write_block(self, file: Any, block: BlockAccessor) -> None:
|
91 | 91 | # and is meant to be used for singular actions like
|
92 | 92 | # [committing a transaction](https://docs.ray.io/en/latest/data/api/doc/ray.data.Datasource.html).
|
93 | 93 | # As deceptive as it may look, there is no race condition here.
|
94 |
| - def on_write_complete(self, write_results: list[Any], **_: Any) -> None: |
| 94 | + def on_write_complete(self, write_results: WriteResult[str]) -> None: |
95 | 95 | """Execute callback after all write tasks complete."""
|
96 | 96 | _logger.debug("Write complete %s.", write_results)
|
97 | 97 |
|
98 | 98 | # Collect and return all write task paths
|
99 |
| - self._write_paths.extend(write_results) |
| 99 | + self._write_paths.extend(write_results.write_returns) |
100 | 100 |
|
101 | 101 | def get_write_paths(self) -> list[str]:
|
102 | 102 | """Return S3 paths of where the results have been written."""
|
|
0 commit comments