Skip to content

Commit

Permalink
Rename internal batch_info variable to previous_batch_results (#1…
Browse files Browse the repository at this point in the history
…1056) (#11062)

* Rename `batch_info` to `previous_batch_results`

* Exclude `previous_batch_results` from serialization of model node to avoid jinja context bloat

* Drop `previous_batch_results` key from `test_manifest.py` unit tests

In 4050e37 we began excluding
`previous_batch_results` from the serialized representation of the
ModelNode. As such, we no longer need to check for it in `test_manifest.py`.

(cherry picked from commit 0f084e1)

Co-authored-by: Quigley Malcolm <QMalcolm@users.noreply.github.com>
  • Loading branch information
github-actions[bot] and QMalcolm authored Nov 27, 2024
1 parent aceae51 commit 65f05e0
Show file tree
Hide file tree
Showing 4 changed files with 10 additions and 9 deletions.
4 changes: 3 additions & 1 deletion core/dbt/contracts/graph/nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -444,13 +444,15 @@ def resource_class(cls) -> Type[HookNodeResource]:

@dataclass
class ModelNode(ModelResource, CompiledNode):
batch_info: Optional[BatchResults] = None
previous_batch_results: Optional[BatchResults] = None
_has_this: Optional[bool] = None

def __post_serialize__(self, dct: Dict, context: Optional[Dict] = None):
dct = super().__post_serialize__(dct, context)
if "_has_this" in dct:
del dct["_has_this"]
if "previous_batch_results" in dct:
del dct["previous_batch_results"]
return dct

@classmethod
Expand Down
2 changes: 1 addition & 1 deletion core/dbt/task/retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,7 @@ def run(self):
# batch info if there were _no_ successful batches previously. This is
# because passing the batch info _forces_ the microbatch process into
# _incremental_ model, and it may be that we need to be in full refresh
# mode which is only handled if batch_info _isn't_ passed for a node
# mode which is only handled if previous_batch_results _isn't_ passed for a node
batch_map = {
result.unique_id: result.batch_results
for result in self.previous_results.results
Expand Down
12 changes: 6 additions & 6 deletions core/dbt/task/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,8 @@ def merge_batch_results(self, result: RunResult, batch_results: List[RunResult])
result.batch_results.failed = sorted(result.batch_results.failed)

# # If retrying, propagate previously successful batches into final result, even thoguh they were not run in this invocation
if self.node.batch_info is not None:
result.batch_results.successful += self.node.batch_info.successful
if self.node.previous_batch_results is not None:
result.batch_results.successful += self.node.previous_batch_results.successful

def _build_succesful_run_batch_result(
self,
Expand Down Expand Up @@ -508,15 +508,15 @@ def _execute_microbatch_materialization(
)

if self.batch_idx is None:
# Note currently (9/30/2024) model.batch_info is only ever _not_ `None`
# Note currently (9/30/2024) model.previous_batch_results is only ever _not_ `None`
# IFF `dbt retry` is being run and the microbatch model had batches which
# failed on the run of the model (which is being retried)
if model.batch_info is None:
if model.previous_batch_results is None:
end = microbatch_builder.build_end_time()
start = microbatch_builder.build_start_time(end)
batches = microbatch_builder.build_batches(start, end)
else:
batches = model.batch_info.failed
batches = model.previous_batch_results.failed
# If there is batch info, then don't run as full_refresh and do force is_incremental
# not doing this risks blowing away the work that has already been done
if self._has_relation(model=model):
Expand Down Expand Up @@ -885,7 +885,7 @@ def populate_microbatch_batches(self, selected_uids: AbstractSet[str]):
if uid in self.batch_map:
node = self.manifest.ref_lookup.perform_lookup(uid, self.manifest)
if isinstance(node, ModelNode):
node.batch_info = self.batch_map[uid]
node.previous_batch_results = self.batch_map[uid]

def before_run(self, adapter: BaseAdapter, selected_uids: AbstractSet[str]) -> RunStatus:
with adapter.connection_named("master"):
Expand Down
1 change: 0 additions & 1 deletion tests/unit/contracts/graph/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@
"deprecation_date",
"defer_relation",
"time_spine",
"batch_info",
}
)

Expand Down

0 comments on commit 65f05e0

Please sign in to comment.