Skip to content

Commit

Permalink
fix: Use airbyte state message format + tracking (#18)
Browse files Browse the repository at this point in the history
This change will fix incremental extraction for a few TAPs, like
tap-amplitude and tap-s3.

 - Adds a new entry to state tracking json, 'airbyte_state'.
- This new entry is populated on any invocation, and contains airbyte's
way to track state as documented here:
https://docs.airbyte.com/understanding-airbyte/database-data-catalog.
- If this entry does not exist, we fallback to what was originally being
done.
- If this entry does exist, we use this data as the input state.json to
airbyte's docker container.

---------

Co-authored-by: Edgar Ramírez Mondragón <16805946+edgarrmondragon@users.noreply.github.com>
  • Loading branch information
JichaoS and edgarrmondragon authored Sep 6, 2024
1 parent c3d1620 commit e573ee8
Showing 1 changed file with 64 additions and 4 deletions.
68 changes: 64 additions & 4 deletions tap_airbyte/tap.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

from __future__ import annotations

from copy import deepcopy
import errno
import os
import shutil
Expand Down Expand Up @@ -570,8 +571,15 @@ def run_read(self) -> t.Iterator[subprocess.Popen]:
catalog.write(orjson.dumps(self.configured_airbyte_catalog))
if self.airbyte_state:
with open(f"{host_tmpdir}/state.json", "wb") as state:
self.logger.debug("Using state: %s", self.airbyte_state)
state.write(orjson.dumps(self.airbyte_state))
# Use the new airbyte state container if it exists.
state_dict = self.airbyte_state
if 'airbyte_state' in self.airbyte_state:
# This is airbyte state V2
state_dict = self.airbyte_state['airbyte_state']

self.logger.debug("Using state: %s", state_dict)
state.write(orjson.dumps(state_dict))

runtime_conf_dir = host_tmpdir if self.is_native() else self.airbyte_mount_dir
proc = subprocess.Popen(
self.to_command(
Expand Down Expand Up @@ -789,8 +797,55 @@ def sync_all(self) -> None:
):
self._process_log_message(airbyte_message)
elif airbyte_message["type"] == AirbyteMessage.STATE:
state_message = airbyte_message["state"]
# See: https://docs.airbyte.com/understanding-airbyte/database-data-catalog
# for how this state should be handled.
state_message = deepcopy(airbyte_message["state"])
state_type = state_message["type"]

if "airbyte_state" not in self.airbyte_state:
self.airbyte_state["airbyte_state"] = []

# The airbyte_state_v2 here should adhere to the link above.
existing_airbyte_state_v2: list[dict] = deepcopy(self.airbyte_state["airbyte_state"])
if state_type == "STREAM":
stream_descriptor = state_message["stream"]["stream_descriptor"]
stream_state = state_message["stream"]["stream_state"]

# Update the state for this stream descriptor or add it to the list.
found = False
for existing_state in existing_airbyte_state_v2:
if existing_state["type"] == "STREAM" and existing_state["stream"]["stream_descriptor"] == stream_descriptor:
existing_state["stream"]["stream_state"] = stream_state
found = True
break
if not found:
existing_airbyte_state_v2.append({
"type": "STREAM",
"stream": state_message["stream"]
})
elif state_type == "GLOBAL":
# Update the global state.
found = False
for existing_state in existing_airbyte_state_v2:
if existing_state["type"] == "GLOBAL":
existing_state["global"] = state_message["global"]
found = True
break
if not found:
existing_airbyte_state_v2.append({
"type": "GLOBAL",
"global": state_message["global"]
})
elif state_type == "LEGACY":
# One record per connector.
existing_airbyte_state_v2.clear()
existing_airbyte_state_v2.append(
{
"type": "LEGACY",
"legacy": state_message["legacy"]
}
)

if "data" in state_message:
unpacked_state = state_message["data"]
elif state_type == "STREAM":
Expand All @@ -799,7 +854,12 @@ def sync_all(self) -> None:
unpacked_state = state_message["global"]
elif state_type == "LEGACY":
unpacked_state = state_message["legacy"]
self.airbyte_state = unpacked_state

# Keep the legacy state behavior, but append the new state under a new key.
# Deepcopy here since existing_airbyte_state_v2 can reference the same object.
self.airbyte_state = deepcopy(unpacked_state)
self.airbyte_state['airbyte_state'] = existing_airbyte_state_v2

with STDOUT_LOCK:
singer.write_message(singer.StateMessage(self.airbyte_state))
else:
Expand Down

0 comments on commit e573ee8

Please sign in to comment.