Skip to content

Commit

Permalink
feat: introduce v2_{processed,finished} states
Browse files Browse the repository at this point in the history
i want to introduce the new finished state in the test results upload
pipeline

to do so safely i'm adding the new v2_processed and v2_finished states

the reason for this is that the meaning of processed and v2_processed
are different

processed means that test instances are in the db and persisted
but in the new pipeline v2_finished has that meaning and v2_processed
just means that the intermediate results are in redis for now
  • Loading branch information
joseph-sentry committed Dec 20, 2024
1 parent 73d8c44 commit 1296951
Show file tree
Hide file tree
Showing 5 changed files with 17 additions and 17 deletions.
3 changes: 2 additions & 1 deletion services/processing/flake_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,9 @@ def process_flake_for_repo_commit(
):
uploads = ReportSession.objects.filter(
report__report_type=CommitReport.ReportType.TEST_RESULTS.value,
report__commit__repository__repoid=repo_id,
report__commit__commitid=commit_id,
state="processed",
state__in=["processed", "v2_finished"],
).all()

curr_flakes = fetch_curr_flakes(repo_id)
Expand Down
8 changes: 2 additions & 6 deletions tasks/ta_finisher.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,17 +232,13 @@ def process_impl_within_lock(
.filter(
CommitReport.commit_id == commit.id,
CommitReport.report_type == ReportType.TEST_RESULTS.value,
Upload.state == "v2_processed",
)
.all()
)

redis_client = get_redis_connection()

tests_to_write: dict[str, dict[str, Any]] = {}
test_instances_to_write: list[dict[str, Any]] = []
daily_totals: dict[str, DailyTotals] = dict()
test_flag_bridge_data: list[dict] = []

repo_flakes: list[Flake] = (
db_session.query(Flake.testid)
.filter(Flake.repoid == repoid, Flake.end_date.is_(None))
Expand Down Expand Up @@ -364,7 +360,7 @@ def process_impl_within_lock(
extra=dict(upload_id=upload.id),
)

upload.state = "finished"
upload.state = "v2_finished"
db_session.commit()

redis_client.delete(intermediate_key)
Expand Down
7 changes: 4 additions & 3 deletions tasks/ta_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ def process_individual_upload(
upload_id = upload.id

log.info("Processing individual upload", extra=dict(upload_id=upload_id))
if upload.state == "processed" or upload.state == "has_failed":
if upload.state == "v2_processed" or upload.state == "v2_failed":
return False

payload_bytes = archive_service.read_file(upload.storage_path)
Expand All @@ -102,16 +102,17 @@ def process_individual_upload(
),
)
sentry_sdk.capture_exception(exc, tags={"upload_state": upload.state})
upload.state = "has_failed"
upload.state = "v2_failed"
db_session.commit()
return False
else:
redis_client.set(
f"ta/intermediate/{repository.repoid}/{commitid}/{upload_id}",
bytes(msgpacked),
ex=60 * 60,
)

upload.state = "processed"
upload.state = "v2_processed"
db_session.commit()

log.info(
Expand Down
8 changes: 5 additions & 3 deletions tasks/tests/unit/test_process_flakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -311,8 +311,10 @@ def test_it_handles_only_passes(transactional_db):
def test_it_creates_flakes_from_processed_uploads(transactional_db):
rs = RepoSimulator()
c1 = rs.create_commit()
rs.add_test_instance(c1)
rs.add_test_instance(c1, outcome=TestInstance.Outcome.FAILURE.value)
rs.add_test_instance(c1, state="v2_finished")
rs.add_test_instance(
c1, outcome=TestInstance.Outcome.FAILURE.value, state="processed"
)

rs.run_task(rs.repo.repoid, c1.commitid)

Expand All @@ -329,7 +331,7 @@ def test_it_creates_flakes_from_processed_uploads(transactional_db):
def test_it_does_not_create_flakes_from_flake_processed_uploads(transactional_db):
rs = RepoSimulator()
c1 = rs.create_commit()
rs.add_test_instance(c1)
rs.add_test_instance(c1, state="v2_processed")
rs.add_test_instance(
c1, outcome=TestInstance.Outcome.FAILURE.value, state="flake_processed"
)
Expand Down
8 changes: 4 additions & 4 deletions tasks/tests/unit/test_ta_processor_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def test_ta_processor_task_call(
)

assert result is True
assert upload.state == "processed"
assert upload.state == "v2_processed"

redis = get_redis_connection()
a = redis.get(
Expand Down Expand Up @@ -195,7 +195,7 @@ def test_ta_processor_task_error_parsing_file(
)

assert result is False
assert upload.state == "has_failed"
assert upload.state == "v2_failed"

@pytest.mark.integration
def test_ta_processor_task_delete_archive(
Expand Down Expand Up @@ -325,7 +325,7 @@ def test_ta_processor_task_call_already_processed(
with open(here.parent.parent / "samples" / "sample_test.json") as f:
content = f.read()
mock_storage.write_file("archive", url, content)
upload = UploadFactory.create(storage_path=url, state="processed")
upload = UploadFactory.create(storage_path=url, state="v2_processed")
dbsession.add(upload)
dbsession.flush()
argument = {"url": url, "upload_id": upload.id_}
Expand Down Expand Up @@ -415,4 +415,4 @@ def test_ta_processor_task_call_already_processed_with_junit(
)

assert result is True
assert upload.state == "processed"
assert upload.state == "v2_processed"

0 comments on commit 1296951

Please sign in to comment.