Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve performance of rw #59

Merged
merged 9 commits into from
Dec 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions alluxiofs/client/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE = "alluxio.common.ondemandpool.disable"
LIST_URL_FORMAT = "http://{worker_host}:{http_port}/v1/files"
FULL_PAGE_URL_FORMAT = "http://{worker_host}:{http_port}/v1/file/{path_id}/page/{page_index}?ufsFullPath={file_path}"
FULL_RANGE_URL_FORMAT = "http://{worker_host}:{http_port}/v1/range/{path_id}?ufsFullPath={file_path}&offset={offset}&length={length}"
FULL_CHUNK_URL_FORMAT = "http://{worker_host}:{http_port}/v1/chunk/{path_id}?ufsFullPath={file_path}&chunkSize={chunk_size}"
PAGE_URL_FORMAT = (
"http://{worker_host}:{http_port}/v1/file/{path_id}"
Expand Down
103 changes: 85 additions & 18 deletions alluxiofs/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
CP_URL_FORMAT,
FULL_CHUNK_URL_FORMAT,
WRITE_CHUNK_URL_FORMAT,
FULL_RANGE_URL_FORMAT,
)
from .const import ALLUXIO_COMMON_ONDEMANDPOOL_DISABLE
from .const import ALLUXIO_COMMON_EXTENSION_ENABLE
Expand Down Expand Up @@ -73,6 +74,7 @@ class AlluxioPathStatus:
last_modification_time_ms: int
human_readable_file_size: str
length: int
content_hash: str = None


class LoadState(Enum):
Expand Down Expand Up @@ -257,6 +259,7 @@ def get_file_status(self, path):
- last_modification_time_ms (long): the last modification time
- length (integer): length of the file or 0 for directory
- human_readable_file_size (string): the size of the human readable files
- content_hash (string): the hash of the file content

Example:
{
Expand All @@ -267,6 +270,7 @@ def get_file_status(self, path):
last_modification_time_ms: 0,
length: 0,
human_readable_file_size: '0B'
content_hash: 'd41d8cd98f00b204e9800998ecf8427e'
}
"""
self._validate_path(path)
Expand All @@ -284,6 +288,8 @@ def get_file_status(self, path):
)
response.raise_for_status()
data = json.loads(response.content)[0]
if data.get("mContentHash") is None:
data["mContentHash"] = '"'
return AlluxioPathStatus(
data["mType"],
data["mName"],
Expand All @@ -292,6 +298,7 @@ def get_file_status(self, path):
data["mLastModificationTimeMs"],
data["mHumanReadableFileSize"],
data["mLength"],
data["mContentHash"].strip('"'),
)
except Exception as e:
raise Exception(
Expand Down Expand Up @@ -443,6 +450,46 @@ def read(self, file_path):
Args:
file_path (str): The full ufs file path to read data from

Returns:
file content (str): The full file content
"""
try:
file_status = self.get_file_status(file_path)
if file_status is None:
raise FileNotFoundError(f"File {file_path} not found")
return self.read_range(file_path, 0, file_status.length)
# self._validate_path(file_path)
# worker_host, worker_http_port = self._get_preferred_worker_address(
# file_path
# )
# path_id = self._get_path_hash(file_path)
# try:
# if self.data_manager:
# return b"".join(
# self._all_page_generator_alluxiocommon(
# worker_host, worker_http_port, path_id, file_path
# )
# )
# else:
# return b"".join(
# self._all_page_generator(
# worker_host, worker_http_port, path_id, file_path
# )
# )
Comment on lines +461 to +478
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to delete these comments.

except Exception as e:
raise Exception(
f"Error when reading file {file_path}: error {e}"
) from e

def read_file_range(self, file_path, offset=0, length=-1):
"""
Reads the full file.

Args:
file_path (str): The full ufs file path to read data from
offset (integer): The offset to start reading data from
length (integer): The file length to read

Returns:
file content (str): The full file content
"""
Expand All @@ -453,16 +500,17 @@ def read(self, file_path):
path_id = self._get_path_hash(file_path)
try:
if self.data_manager:
return b"".join(
self._all_page_generator_alluxiocommon(
worker_host, worker_http_port, path_id, file_path
)
self._all_chunk_generator_alluxiocommon(
worker_host, worker_http_port, path_id, file_path
)
else:
return b"".join(
self._all_page_generator(
worker_host, worker_http_port, path_id, file_path
)
return self._all_file_range_generator(
worker_host,
worker_http_port,
path_id,
file_path,
offset,
length,
)
except Exception as e:
raise Exception(
Expand Down Expand Up @@ -604,7 +652,7 @@ def write(self, file_path, file_bytes):
Write a byte[] content to the file.
Args:
file_path (str): The full ufs file path to read data from
file_bytes (str): The full ufs file content
file_bytes (bytes): The full ufs file content
Returns:
True if the write was successful, False otherwise.
"""
Expand Down Expand Up @@ -851,12 +899,12 @@ def cp(self, path1, path2, option):
except requests.RequestException as e:
raise Exception(f"Error copy a file from {path1} to {path2}: {e}")

def tail(self, file_path, numOfBytes=None):
def tail(self, file_path, num_of_bytes=None):
"""
show the tail a file which path is 'file_path'.
Args:
path1: The ufs path of the file.
path2: The length of the file to show (like 1kb).
file_path: The ufs path of the file.
num_of_bytes: The length of the file to show (like 1kb).
Returns:
The content of tail of the file.
"""
Expand All @@ -873,18 +921,18 @@ def tail(self, file_path, numOfBytes=None):
path_id=path_id,
file_path=file_path,
),
params={"numBytes": numOfBytes},
params={"numOfBytes": num_of_bytes},
)
return b"".join(response.iter_content())
except requests.RequestException as e:
raise Exception(f"Error show the tail of {file_path}: {e}")

def head(self, file_path, numOfBytes=None):
def head(self, file_path, num_of_bytes=None):
"""
show the head a file which path is 'file_path'.
Args:
path1: The ufs path of the file.
path2: The length of the file to show (like 1kb).
file_path: The ufs path of the file.
num_of_bytes: The length of the file to show (like 1kb).
Returns:
The content of head of the file.
"""
Expand All @@ -901,7 +949,7 @@ def head(self, file_path, numOfBytes=None):
path_id=path_id,
file_path=file_path,
),
params={"numBytes": numOfBytes},
params={"numBytes": num_of_bytes},
)
return b"".join(response.iter_content())
except requests.RequestException as e:
Expand Down Expand Up @@ -968,7 +1016,6 @@ def _all_page_generator(
if len(page_content) < self.config.page_size: # last page
break
page_index += 1
print(f"page_index:{page_index} is done")

def _all_page_generator_write(
self, worker_host, worker_http_port, path_id, file_path, file_bytes
Expand Down Expand Up @@ -1131,6 +1178,26 @@ def _range_page_generator(
# read some data successfully, return those data
break

def _all_file_range_generator(
self, worker_host, worker_http_port, path_id, file_path, offset, length
):
try:
url = FULL_RANGE_URL_FORMAT.format(
worker_host=worker_host,
http_port=worker_http_port,
path_id=path_id,
file_path=file_path,
offset=offset,
length=length,
)
response = requests.get(url)
response.raise_for_status()
return response.content
except Exception as e:
raise Exception(
f"Error when reading file {path_id} with offset {offset} and length {length}: error {e}"
) from e

def _create_session(self, concurrency):
session = requests.Session()
adapter = HTTPAdapter(
Expand Down
Loading
Loading