Skip to content
This repository has been archived by the owner on Jul 17, 2024. It is now read-only.

Commit

Permalink
feat: introduce fairness
Browse files Browse the repository at this point in the history
  • Loading branch information
triceo committed Jun 27, 2024
1 parent 5aa2cbd commit 2c443dc
Show file tree
Hide file tree
Showing 2 changed files with 276 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,20 @@ def concat(self, other):
else:
raise RuntimeError(f'Unhandled constraint stream type {type(other)}.')

def complement(self, cls: type[A]) -> 'UniConstraintStream[A]':
"""
Adds to the stream all instances of a given class which are not yet present in it.
These instances must be present in the solution,
which means the class needs to be either a planning entity or a problem fact.
Parameters
----------
cls : Type[A]
the type of the instances to add to the stream.
"""
result = self.delegate.complement(get_class(cls))
return TriConstraintCollector(result, self.package, self.a_type)

def penalize(self, constraint_weight: ScoreType, match_weigher: Callable[[A], int] = None) -> \
'UniConstraintBuilder[A, ScoreType]':
"""
Expand Down Expand Up @@ -1000,6 +1014,40 @@ def concat(self, other):
else:
raise RuntimeError(f'Unhandled constraint stream type {type(other)}.')

@overload
def complement(self, cls: type[A]) -> 'BiConstraintStream[A, B]':
...

@overload
def complement(self, cls: type[A], padding: Callable[[A], B]) -> 'BiConstraintStream[A, B]':
...

def complement(self, cls: type[A], padding=None):
"""
Adds to the stream all instances of a given class which are not yet present in it.
These instances must be present in the solution,
which means the class needs to be either a planning entity or a problem fact.
The instances will be read from the first element of the input tuple.
When an output tuple needs to be created for the newly inserted instances,
the first element will be the new instance.
The rest of the tuple will be padded with the result of the padding function.
Parameters
----------
cls : Type[A]
the type of the instances to add to the stream.
padding : Callable[[A], B]
a function that computes the padding value for the second fact in the new tuple.
"""
if None == padding:
result = self.delegate.complement(get_class(cls))
return TriConstraintCollector(result, self.package, self.a_type, self.b_type)
java_padding = function_cast(padding, self.a_type)
result = self.delegate.complement(get_class(cls), java_padding)
return TriConstraintCollector(result, self.package, self.a_type, self.b_type)

def penalize(self, constraint_weight: ScoreType, match_weigher: Callable[[A, B], int] = None) -> \
'BiConstraintBuilder[A, B, ScoreType]':
"""
Expand Down Expand Up @@ -1544,6 +1592,51 @@ def concat(self, other):
else:
raise RuntimeError(f'Unhandled constraint stream type {type(other)}.')

@overload
def complement(self, cls: type[A]) -> 'TriConstraintStream[A, B, C]':
...

@overload
def complement(self, cls: type[A], padding_b: Callable[[A], B], padding_c: Callable[[A], C]) \
-> 'TriConstraintStream[A, B, C]':
...

def complement(self, cls: type[A], padding_b=None, padding_c=None):
"""
Adds to the stream all instances of a given class which are not yet present in it.
These instances must be present in the solution,
which means the class needs to be either a planning entity or a problem fact.
The instances will be read from the first element of the input tuple.
When an output tuple needs to be created for the newly inserted instances,
the first element will be the new instance.
The rest of the tuple will be padded with the result of the padding function,
applied on the new instance.
Padding functions are optional, but if one is provided, then both must-be provided.
Parameters
----------
cls : Type[A]
the type of the instances to add to the stream.
padding_b : Callable[[A], B]
a function that computes the padding value for the second fact in the new tuple.
padding_c : Callable[[A], C]
a function that computes the padding value for the third fact in the new tuple.
"""
if None == padding_b == padding_c:
result = self.delegate.complement(get_class(cls))
return TriConstraintCollector(result, self.package, self.a_type, self.b_type, self.c_type)
specified_count = sum(x is not None for x in [padding_b, padding_c])
if specified_count != 0:
raise ValueError(f'If a padding function is provided, both are expected, got {specified_count} instead.')
java_padding_b = function_cast(padding_b, self.a_type)
java_padding_c = function_cast(padding_c, self.a_type)
result = self.delegate.complement(get_class(cls), java_padding_b, java_padding_c)
return TriConstraintCollector(result, self.package, self.a_type, self.b_type, self.c_type)

def penalize(self, constraint_weight: ScoreType,
match_weigher: Callable[[A, B, C], int] = None) -> 'TriConstraintBuilder[A, B, C, ScoreType]':
"""
Expand Down Expand Up @@ -2016,7 +2109,6 @@ def map(self, *mapping_functions):
JClass('java.lang.Object'))
if len(mapping_functions) == 4:
return QuadConstraintStream(self.delegate.map(*translated_functions), self.package,

JClass('java.lang.Object'), JClass('java.lang.Object'),
JClass('java.lang.Object'), JClass('java.lang.Object'))
raise RuntimeError(f'Impossible state: missing case for {len(mapping_functions)}.')
Expand All @@ -2027,7 +2119,6 @@ def flatten_last(self, flattening_function) -> 'QuadConstraintStream[A,B,C,D]':
"""
translated_function = function_cast(flattening_function, self.d_type)
return QuadConstraintStream(self.delegate.flattenLast(translated_function), self.package,

self.a_type, self.b_type, self.c_type, JClass('java.lang.Object'))

def distinct(self) -> 'QuadConstraintStream[A,B,C,D]':
Expand Down Expand Up @@ -2083,6 +2174,55 @@ def concat(self, other):
else:
raise RuntimeError(f'Unhandled constraint stream type {type(other)}.')

@overload
def complement(self, cls: type[A]) -> 'QuadConstraintStream[A, B, C, D]':
...

@overload
def complement(self, cls: type[A], padding_b: Callable[[A], B], padding_c: Callable[[A], C],
padding_d: Callable[[A], D]) -> 'QuadConstraintStream[A, B, C, D]':
...

def complement(self, cls: type[A], padding_b=None, padding_c=None, padding_d=None):
"""
Adds to the stream all instances of a given class which are not yet present in it.
These instances must be present in the solution,
which means the class needs to be either a planning entity or a problem fact.
The instances will be read from the first element of the input tuple.
When an output tuple needs to be created for the newly inserted instances,
the first element will be the new instance.
The rest of the tuple will be padded with the result of the padding function,
applied on the new instance.
Padding functions are optional, but if one is provided, then all three must-be provided.
Parameters
----------
cls : Type[A]
the type of the instances to add to the stream.
padding_b : Callable[[A], B]
a function that computes the padding value for the second fact in the new tuple.
padding_c : Callable[[A], C]
a function that computes the padding value for the third fact in the new tuple.
padding_d : Callable[[A], D]
a function that computes the padding value for the fourth fact in the new tuple.
"""
if None == padding_b == padding_c == padding_d:
result = self.delegate.complement(get_class(cls))
return QuadConstraintCollector(result, self.package, self.a_type, self.b_type, self.c_type, self.d_type)
specified_count = sum(x is not None for x in [padding_b, padding_c, padding_d])
if specified_count != 0:
raise ValueError(f'If a padding function is provided, all 3 are expected, got {specified_count} instead.')
java_padding_b = function_cast(padding_b, self.a_type)
java_padding_c = function_cast(padding_c, self.a_type)
java_padding_d = function_cast(padding_d, self.a_type)
result = self.delegate.complement(get_class(cls), java_padding_b, java_padding_c, java_padding_d)
return QuadConstraintCollector(result, self.package, self.a_type, self.b_type, self.c_type, self.d_type)

def penalize(self, constraint_weight: ScoreType,
match_weigher: Callable[[A, B, C, D], int] = None) -> 'QuadConstraintBuilder[A, B, C, D, ScoreType]':
"""
Expand Down
134 changes: 134 additions & 0 deletions timefold-solver-python-core/src/main/python/score/_group_by.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import Callable, Any, Sequence, TypeVar, List, Set, Dict, TYPE_CHECKING, overload
if TYPE_CHECKING:
from ai.timefold.solver.core.api.score.stream.common import SequenceChain
from ai.timefold.solver.core.api.score.stream.common import LoadBalance
from ai.timefold.solver.core.api.score.stream.uni import UniConstraintCollector
from ai.timefold.solver.core.api.score.stream.bi import BiConstraintCollector
from ai.timefold.solver.core.api.score.stream.tri import TriConstraintCollector
Expand Down Expand Up @@ -61,6 +62,14 @@ class CollectAndThenCollector:
mapping_function: Callable


@dataclasses.dataclass
class LoadBalanceCollector:
collector_creator: Callable
balanced_item_function: Callable
load_function: Callable | None
initial_load_function: Callable | None


def extract_collector(collector_info, *type_arguments):
if isinstance(collector_info, NoArgsConstraintCollector):
return collector_info.collector_creator()
Expand Down Expand Up @@ -89,6 +98,15 @@ def extract_collector(collector_info, *type_arguments):
delegate_collector = extract_collector(collector_info.delegate_collector, *type_arguments)
mapping_function = function_cast(collector_info.mapping_function, JClass('java.lang.Object'))
return collector_info.collector_creator(delegate_collector, mapping_function)
elif isinstance(collector_info, LoadBalanceCollector):
balanced_item_function = function_cast(collector_info.balanced_item_function, *type_arguments)
if collector_info.load_function is None:
return collector_info.collector_creator(balanced_item_function)
load_function = function_cast(collector_info.load_function, *type_arguments)
if collector_info.initial_load_function is None:
return collector_info.collector_creator(balanced_item_function, load_function)
initial_load_function = function_cast(collector_info.initial_load_function, *type_arguments)
return collector_info.collector_creator(balanced_item_function, load_function, initial_load_function)
else:
raise ValueError(f'Invalid Collector: {collector_info}. '
f'Create Collectors via timefold.solver.constraint.ConstraintCollectors.')
Expand Down Expand Up @@ -135,13 +153,15 @@ class ConstraintCollectors:
C = TypeVar('C')
D = TypeVar('D')
E = TypeVar('E')
Balanced = TypeVar('Balanced')

# Method return type variables
A_ = TypeVar('A_')
B_ = TypeVar('B_')
C_ = TypeVar('C_')
D_ = TypeVar('D_')
E_ = TypeVar('E_')
Balanced_ = TypeVar('Balanced_')

@staticmethod
def _delegate():
Expand Down Expand Up @@ -993,6 +1013,120 @@ def to_sorted_map(key_mapper, value_mapper, merge_function_or_set_creator=None):
else:
raise ValueError

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A], Balanced_]) -> \
'UniConstraintCollector[A, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A], Balanced_], load_function: Callable[[A], int]) -> \
'UniConstraintCollector[A, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A], Balanced_], load_function: Callable[[A], int],
initial_load_function: Callable[[A], int]) -> \
'UniConstraintCollector[A, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A, B], Balanced_]) -> \
'BiConstraintCollector[A, B, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A, B], Balanced_], load_function: Callable[[A, B], int]) -> \
'BiConstraintCollector[A, B, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A, B], Balanced_], load_function: Callable[[A, B], int],
initial_load_function: Callable[[A, B], int]) -> \
'BiConstraintCollector[A, B, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A, B, C], Balanced_]) -> \
'TriConstraintCollector[A, B, C, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A, B, C], Balanced_],
load_function: Callable[[A, B, C], int]) -> \
'TriConstraintCollector[A, B, C, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A, B, C], Balanced_], load_function: Callable[[A, B, C], int],
initial_load_function: Callable[[A, B, C], int]) -> \
'TriConstraintCollector[A, B, C, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A, B, C, D], Balanced_]) -> \
'QuadConstraintCollector[A, B, C, D, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A, B, C, D], Balanced_],
load_function: Callable[[A, B, C, D], int]) -> \
'QuadConstraintCollector[A, B, C, D, Any, LoadBalance[Balanced_]]':
...

@overload
@staticmethod
def load_balance(balanced_item_function: Callable[[A, B, C, D], Balanced_],
load_function: Callable[[A, B, C, D], int],
initial_load_function: Callable[[A, B, C, D], int]) -> \
'QuadConstraintCollector[A, B, C, D, Any, LoadBalance[Balanced_]]':
...

@staticmethod
def load_balance(balanced_item_function, load_function=None, initial_load_function=None):
"""
Returns a collector that takes a stream of items and calculates the unfairness measure from them.
The load for every item is provided by the load_function,
with the starting load provided by the initial_load_function.
When this collector is used in a constraint stream,
it is recommended to use a score type which supports real numbers.
This is so that the unfairness measure keeps its precision
without forcing the other constraints to be multiplied by a large constant,
which would otherwise be required to implement fixed-point arithmetic.
Parameters
----------
balanced_item_function:
The function that returns the item which should be load-balanced.
load_function:
How much the item should count for in the formula.
initial_load_function:
The initial value of the metric, allowing to provide initial state
without requiring the entire previous planning windows in the working memory.
If this function is provided, load_function must be provided as well.
"""
if None == load_function == initial_load_function:
return LoadBalanceCollector(ConstraintCollectors._delegate().loadBalance, balanced_item_function, None,
None)
elif None == initial_load_function:
return LoadBalanceCollector(ConstraintCollectors._delegate().loadBalance, balanced_item_function,
load_function, None)
elif None == load_function:
raise ValueError("load_function cannot be None if initial_load_function is not None")
else:
return LoadBalanceCollector(ConstraintCollectors._delegate().loadBalance, balanced_item_function,
load_function, initial_load_function)

# Must be at the bottom, constraint_stream depends on this module
from ._constraint_stream import *
Expand Down

0 comments on commit 2c443dc

Please sign in to comment.