Skip to content

Commit

Permalink
AIP-72: Add logging for exception in Task Runner (apache#45502)
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored Jan 9, 2025
1 parent 0480623 commit 9de80ab
Showing 1 changed file with 12 additions and 2 deletions.
14 changes: 12 additions & 2 deletions task_sdk/src/airflow/sdk/execution_time/task_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,8 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# etc
msg = TaskState(state=TerminalTIState.SUCCESS, end_date=datetime.now(tz=timezone.utc))
except TaskDeferred as defer:
# TODO: Should we use structlog.bind_contextvars here for dag_id, task_id & run_id?
log.info("Pausing task as DEFERRED. ", dag_id=ti.dag_id, task_id=ti.task_id, run_id=ti.run_id)
classpath, trigger_kwargs = defer.trigger.serialize()
next_method = defer.method_name
defer_timeout = defer.timeout
Expand All @@ -457,19 +459,22 @@ def run(ti: RuntimeTaskInstance, log: Logger):
next_method=next_method,
trigger_timeout=defer_timeout,
)
except AirflowSkipException:
except AirflowSkipException as e:
if e.args:
log.info("Skipping task.", reason=e.args[0])
msg = TaskState(
state=TerminalTIState.SKIPPED,
end_date=datetime.now(tz=timezone.utc),
)
except AirflowRescheduleException as reschedule:
log.info("Rescheduling task, marking task as UP_FOR_RESCHEDULE")
msg = RescheduleTask(
reschedule_date=reschedule.reschedule_date, end_date=datetime.now(tz=timezone.utc)
)
except (AirflowFailException, AirflowSensorTimeout):
# If AirflowFailException is raised, task should not retry.
# If a sensor in reschedule mode reaches timeout, task should not retry.

log.exception("Task failed with exception")
# TODO: Handle fail_stop here: https://github.com/apache/airflow/issues/44951
# TODO: Handle addition to Log table: https://github.com/apache/airflow/issues/44952
msg = TaskState(
Expand All @@ -479,13 +484,15 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# TODO: Run task failure callbacks here
except (AirflowTaskTimeout, AirflowException):
# We should allow retries if the task has defined it.
log.exception("Task failed with exception")
msg = TaskState(
state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc),
)
# TODO: Run task failure callbacks here
except AirflowException:
# TODO: handle the case of up_for_retry here
log.exception("Task failed with exception")
msg = TaskState(
state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc),
Expand All @@ -494,19 +501,22 @@ def run(ti: RuntimeTaskInstance, log: Logger):
# External state updates are already handled with `ti_heartbeat` and will be
# updated already be another UI API. So, these exceptions should ideally never be thrown.
# If these are thrown, we should mark the TI state as failed.
log.exception("Task failed with exception")
msg = TaskState(
state=TerminalTIState.FAIL_WITHOUT_RETRY,
end_date=datetime.now(tz=timezone.utc),
)
# TODO: Run task failure callbacks here
except SystemExit:
# SystemExit needs to be retried if they are eligible.
log.exception("Task failed with exception")
msg = TaskState(
state=TerminalTIState.FAILED,
end_date=datetime.now(tz=timezone.utc),
)
# TODO: Run task failure callbacks here
except BaseException:
log.exception("Task failed with exception")
# TODO: Run task failure callbacks here
msg = TaskState(state=TerminalTIState.FAILED, end_date=datetime.now(tz=timezone.utc))
if msg:
Expand Down

0 comments on commit 9de80ab

Please sign in to comment.