Skip to content

Commit

Permalink
feat: Simplified code
Browse files Browse the repository at this point in the history
  • Loading branch information
micmurawski committed Feb 5, 2024
1 parent 943d62e commit b930f83
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 79 deletions.
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def get_dependencies(subpackage="requirements"):

setup(
name='cloud_array',
version='0.0.6',
version='0.0.7',
author="Michal Murawski",
author_email="mmurawski777@gmail.com",
description="Cloud implementation of array for Big Data",
Expand Down
85 changes: 7 additions & 78 deletions src/cloud_array/array.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,11 @@
from itertools import product
from typing import AnyStr, Dict, List, Sequence, Tuple
from typing import AnyStr, Dict, Sequence, Tuple

import numpy as np

from cloud_array.backends import Backend, get_backend
from cloud_array.exceptions import CloudArrayException
from cloud_array.utils import (chunk2list, collect, compute_index_of_slice, compute_number_of_chunks,
get_index_of_iter_product)
from cloud_array.helpers import (chunk2list, collect, compute_index_of_slice, compute_number_of_chunks,
generate_chunks_slices, get_chunk_slice_by_index, parse_key_to_slices)


class Chunk:
Expand Down Expand Up @@ -112,30 +111,11 @@ def get_metadata(self) -> dict:
return result

def generate_chunks_slices(self) -> Tuple[slice]:
_ranges = (
range(0, a, c)
for c, a in zip(self.chunk_shape, self.shape)
)
p = product(*_ranges)
for i in p:
yield tuple(
slice(
i[j],
min(self.shape[j], i[j]+self.chunk_shape[j])
)
for j in range(len(self.shape))
)
for _slice in generate_chunks_slices(self.shape, self.chunk_shape):
yield _slice

def get_chunk_slice_by_index(self, number: int) -> Tuple[slice]:
p = tuple((0, a, c) for c, a in zip(self.chunk_shape, self.shape))
val = get_index_of_iter_product(number, p)
return tuple(
slice(
val[j],
min(self.shape[j], val[j]+self.chunk_shape[j])
)
for j in range(len(self.shape))
)
return get_chunk_slice_by_index(self.shape, self.chunk_shape, number)

@staticmethod
def count_number_of_chunks(shape: Tuple[int], chunk_shape: Tuple[int]) -> int:
Expand All @@ -161,59 +141,8 @@ def save(self, array=None) -> None:
for chunk in self.chunks():
chunk.save(array[chunk.slice])

def initial_merge_of_chunks(self, sorted_chunks) -> List[Tuple[np.ndarray, Tuple[slice]]]:
datasets = []
for x in sorted_chunks:
data = None
for i in x[0]:
chunk_data = self.get_chunk(i)[:, :, :]
if data is None:
data = chunk_data
else:
data = np.concatenate(
(data, chunk_data),
axis=x[1]
)
datasets.append(
(data, x[2])
)
return datasets

def parse_key_to_slices(self, key: Tuple[slice]):
result = []
for i in range(len(key)):
val = key[i]
if isinstance(val, int):
if val < 0:
val = self.shape[i] + val
result.append(
slice(val, val+1)
)
else:
start = val.start or 0
stop = val.stop or self.shape[i]
if start > self.shape[i] or stop > self.shape[i]:
raise CloudArrayException(
f"Slice {key[i]} does not fit shape: {self.shape}.")
if start >= stop:
raise CloudArrayException(
f"Key invalid slice {key[i]}. Start >= stop.")
if start < 0:
start = self.shape[i] + start
if stop < 0:
stop = self.shape[i] + stop

result.append(
slice(
start,
stop,
val.step if val.step else 1
)
)
return tuple(result)

def __getitem__(self, key) -> np.ndarray:
new_key = self.parse_key_to_slices(key)
new_key = parse_key_to_slices(self.shape, self.chunk_shape, key)

def _get_chunk_data_by_key(key: Sequence[slice]):
idx = compute_index_of_slice(key, self.shape, self.chunk_shape)
Expand Down
65 changes: 65 additions & 0 deletions src/cloud_array/utils.py → src/cloud_array/helpers.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import operator
from copy import copy
from functools import reduce
from itertools import product
from math import ceil
from typing import Callable, List, Sequence, Tuple

import numpy as np

from .exceptions import CloudArrayException


def compute_number_of_chunks(shape: Tuple[int], chunk_shape: Tuple[int]) -> int:
"""
Expand Down Expand Up @@ -81,3 +84,65 @@ def chunk2list(chunk: Tuple[slice]) -> List[List[int]]:

def list2chunk(_list: List[List[int]]) -> Tuple[slice]:
return tuple([slice(*el) for el in _list])


def generate_chunks_slices(shape: Sequence[int], chunk_shape: Sequence[int]) -> Tuple[slice]:
_ranges = (
range(0, a, c)
for c, a in zip(chunk_shape, shape)
)
p = product(*_ranges)
for i in p:
yield tuple(
slice(
i[j],
min(shape[j], i[j]+chunk_shape[j])
)
for j in range(len(shape))
)


def get_chunk_slice_by_index(shape: Sequence[int], chunk_shape: Sequence[int], number: int) -> Tuple[slice]:
p = tuple((0, a, c) for c, a in zip(chunk_shape, shape))
val = get_index_of_iter_product(number, p)
return tuple(
slice(
val[j],
min(shape[j], val[j]+chunk_shape[j])
)
for j in range(len(shape))
)


def parse_key_to_slices(shape: Sequence[int], chunk_shape: Sequence[int], key: Tuple[slice]):
result = []
for i in range(len(key)):
val = key[i]
if isinstance(val, int):
if val < 0:
val = shape[i] + val
result.append(
slice(val, val+1)
)
else:
start = val.start or 0
stop = val.stop or shape[i]
if start > shape[i] or stop > shape[i]:
raise CloudArrayException(
f"Slice {key[i]} does not fit shape: {shape}.")
if start >= stop:
raise CloudArrayException(
f"Key invalid slice {key[i]}. Start >= stop.")
if start < 0:
start = shape[i] + start
if stop < 0:
stop = shape[i] + stop

result.append(
slice(
start,
stop,
val.step if val.step else 1
)
)
return tuple(result)

0 comments on commit b930f83

Please sign in to comment.