From 5f46c20ddfd656939a47ae4845a35b211b661d57 Mon Sep 17 00:00:00 2001 From: ismail simsek <6005685+ismailsimsek@users.noreply.github.com> Date: Fri, 5 Jul 2024 13:28:35 +0200 Subject: [PATCH] Add opendbt code and tests --- .github/dependabot.yml | 10 + .github/workflows/tests.yml | 41 +++ .gitignore | 84 +++++ .idea/.gitignore | 3 + .idea/git_toolbox_blame.xml | 6 + .idea/git_toolbox_prj.xml | 15 + .idea/misc.xml | 78 +++++ .idea/modules.xml | 8 + .idea/opendbt.iml | 11 + .../Python_tests_in_tests.xml | 17 + .idea/runConfigurations/pip_install.xml | 17 + .idea/vcs.xml | 6 + opendbt/__init__.py | 106 ++++++ opendbt/airflow/__init__.py | 179 ++++++++++ opendbt/client.py | 118 +++++++ opendbt/macros/execute.sql | 42 +++ opendbt/macros/executepython.sql | 26 ++ setup.py | 30 ++ tests/custom_adapters.py | 36 ++ tests/resources/airflow/Dockerfile | 12 + tests/resources/airflow/airflow/airflow.cfg | 308 ++++++++++++++++++ .../airflow/airflow/webserver_config.py | 114 +++++++ tests/resources/airflow/dags/dbt_workflow.py | 32 ++ tests/resources/airflow/docker-compose.yaml | 22 ++ tests/resources/dbttest/.gitignore | 4 + tests/resources/dbttest/README.md | 3 + tests/resources/dbttest/analyses/.gitkeep | 0 tests/resources/dbttest/dbt_project.yml | 40 +++ tests/resources/dbttest/macros/.gitkeep | 0 .../models/example/my_first_dbt_model.sql | 27 ++ .../models/example/my_second_dbt_model.sql | 6 + .../dbttest/models/example/schema.yml | 21 ++ .../dbttest/models/my_execute_dbt_model.sql | 7 + .../models/my_executepython_dbt_model.py | 22 ++ tests/resources/dbttest/profiles.yml | 15 + tests/resources/dbttest/seeds/.gitkeep | 0 tests/resources/dbttest/snapshots/.gitkeep | 0 tests/resources/dbttest/tests/.gitkeep | 0 tests/test_airflow.py | 60 ++++ tests/test_custom_adapter.py | 41 +++ tests/test_execute_materialization.py | 13 + tests/test_executepython_materialization.py | 19 ++ tests/test_opendbt_project.py | 24 ++ 43 files changed, 1623 insertions(+) create mode 100644 .github/dependabot.yml create mode 100644 .github/workflows/tests.yml create mode 100644 .idea/.gitignore create mode 100644 .idea/git_toolbox_blame.xml create mode 100644 .idea/git_toolbox_prj.xml create mode 100644 .idea/misc.xml create mode 100644 .idea/modules.xml create mode 100644 .idea/opendbt.iml create mode 100644 .idea/runConfigurations/Python_tests_in_tests.xml create mode 100644 .idea/runConfigurations/pip_install.xml create mode 100644 .idea/vcs.xml create mode 100644 opendbt/__init__.py create mode 100644 opendbt/airflow/__init__.py create mode 100644 opendbt/client.py create mode 100644 opendbt/macros/execute.sql create mode 100644 opendbt/macros/executepython.sql create mode 100644 setup.py create mode 100644 tests/custom_adapters.py create mode 100644 tests/resources/airflow/Dockerfile create mode 100644 tests/resources/airflow/airflow/airflow.cfg create mode 100644 tests/resources/airflow/airflow/webserver_config.py create mode 100644 tests/resources/airflow/dags/dbt_workflow.py create mode 100644 tests/resources/airflow/docker-compose.yaml create mode 100644 tests/resources/dbttest/.gitignore create mode 100644 tests/resources/dbttest/README.md create mode 100644 tests/resources/dbttest/analyses/.gitkeep create mode 100644 tests/resources/dbttest/dbt_project.yml create mode 100644 tests/resources/dbttest/macros/.gitkeep create mode 100644 tests/resources/dbttest/models/example/my_first_dbt_model.sql create mode 100644 tests/resources/dbttest/models/example/my_second_dbt_model.sql create mode 100644 tests/resources/dbttest/models/example/schema.yml create mode 100644 tests/resources/dbttest/models/my_execute_dbt_model.sql create mode 100644 tests/resources/dbttest/models/my_executepython_dbt_model.py create mode 100644 tests/resources/dbttest/profiles.yml create mode 100644 tests/resources/dbttest/seeds/.gitkeep create mode 100644 tests/resources/dbttest/snapshots/.gitkeep create mode 100644 tests/resources/dbttest/tests/.gitkeep create mode 100644 tests/test_airflow.py create mode 100644 tests/test_custom_adapter.py create mode 100644 tests/test_execute_materialization.py create mode 100644 tests/test_executepython_materialization.py create mode 100644 tests/test_opendbt_project.py diff --git a/.github/dependabot.yml b/.github/dependabot.yml new file mode 100644 index 0000000..cbd920f --- /dev/null +++ b/.github/dependabot.yml @@ -0,0 +1,10 @@ +version: 2 +updates: + - package-ecosystem: "github-actions" + directory: "/" + schedule: + interval: "weekly" + - package-ecosystem: "pip" + directory: "/" + schedule: + interval: "weekly" diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml new file mode 100644 index 0000000..08c8693 --- /dev/null +++ b/.github/workflows/tests.yml @@ -0,0 +1,41 @@ +name: Build and Test + +on: + push: + branches: [ main ] + paths-ignore: + - '.github/**' + - '.idea/**' + - '.run/**' + pull_request: + branches: [ main ] + paths-ignore: + - '.github/**' + - '.idea/**' + - '.run/**' + +jobs: + build: + + runs-on: ubuntu-latest + strategy: + matrix: + python-version: [ "3.8", "3.9", "3.10", "3.11" ] + + steps: + - uses: actions/checkout@v4 + - name: Set up Python ${{ matrix.python-version }} + uses: actions/setup-python@v5 + with: + python-version: ${{ matrix.python-version }} + - name: Build & Install + run: | + python --version + python -m pip install coverage pylint pytest + python -m compileall -f opendbt setup.py + python setup.py -q install --user + + - name: Test with Unittest + run: | + python -m coverage run --source=./tests/ -m unittest discover -s tests/ + python -m coverage report -m ./opendbt/*.py setup.py diff --git a/.gitignore b/.gitignore index 82f9275..831e9d8 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,87 @@ +**.duckdb +**.user.yml + +###### JetBrains ###### +# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider +# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 + +# User-specific stuff +.idea/**/workspace.xml +.idea/**/tasks.xml +.idea/**/usage.statistics.xml +.idea/**/dictionaries +.idea/**/shelf + +# AWS User-specific +.idea/**/aws.xml + +# Generated files +.idea/**/contentModel.xml + +# Sensitive or high-churn files +.idea/**/dataSources/ +.idea/**/dataSources.ids +.idea/**/dataSources.local.xml +.idea/**/sqlDataSources.xml +.idea/**/dynamic.xml +.idea/**/uiDesigner.xml +.idea/**/dbnavigator.xml + +# Gradle +.idea/**/gradle.xml +.idea/**/libraries + +# Gradle and Maven with auto-import +# When using Gradle or Maven with auto-import, you should exclude module files, +# since they will be recreated, and may cause churn. Uncomment if using +# auto-import. +# .idea/artifacts +# .idea/compiler.xml +# .idea/jarRepositories.xml +# .idea/modules.xml +# .idea/*.iml +# .idea/modules +# *.iml +# *.ipr + +# CMake +cmake-build-*/ + +# Mongo Explorer plugin +.idea/**/mongoSettings.xml + +# File-based project format +*.iws + +# IntelliJ +out/ + +# mpeltonen/sbt-idea plugin +.idea_modules/ + +# JIRA plugin +atlassian-ide-plugin.xml + +# Cursive Clojure plugin +.idea/replstate.xml + +# SonarLint plugin +.idea/sonarlint/ + +# Crashlytics plugin (for Android Studio and IntelliJ) +com_crashlytics_export_strings.xml +crashlytics.properties +crashlytics-build.properties +fabric.properties + +# Editor-based Rest Client +.idea/httpRequests + +# Android studio 3.1+ serialized cache file +.idea/caches/build_file_checksums.ser + + +###### Python ###### # Byte-compiled / optimized / DLL files __pycache__/ *.py[cod] diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..26d3352 --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,3 @@ +# Default ignored files +/shelf/ +/workspace.xml diff --git a/.idea/git_toolbox_blame.xml b/.idea/git_toolbox_blame.xml new file mode 100644 index 0000000..7dc1249 --- /dev/null +++ b/.idea/git_toolbox_blame.xml @@ -0,0 +1,6 @@ + + + + + \ No newline at end of file diff --git a/.idea/git_toolbox_prj.xml b/.idea/git_toolbox_prj.xml new file mode 100644 index 0000000..02b915b --- /dev/null +++ b/.idea/git_toolbox_prj.xml @@ -0,0 +1,15 @@ + + + + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..6f73b0f --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,78 @@ + + + + + + + + + + + EmbeddedPerformanceJava + + + Error handlingJava + + + Groovy + + + InitializationJava + + + JVM languages + + + Java + + + Java 21Java language level migration aidsJava + + + Java language level migration aidsJava + + + Kotlin + + + LoggingJVM languages + + + MemoryJava + + + PerformanceJava + + + Probable bugsJava + + + Python + + + Redundant constructsKotlin + + + RegExp + + + Style issuesKotlin + + + Threading issuesGroovy + + + Threading issuesJava + + + Verbose or redundant code constructsJava + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..6716111 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/opendbt.iml b/.idea/opendbt.iml new file mode 100644 index 0000000..a2d63e9 --- /dev/null +++ b/.idea/opendbt.iml @@ -0,0 +1,11 @@ + + + + + + + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/Python_tests_in_tests.xml b/.idea/runConfigurations/Python_tests_in_tests.xml new file mode 100644 index 0000000..bb2dad0 --- /dev/null +++ b/.idea/runConfigurations/Python_tests_in_tests.xml @@ -0,0 +1,17 @@ + + + + + \ No newline at end of file diff --git a/.idea/runConfigurations/pip_install.xml b/.idea/runConfigurations/pip_install.xml new file mode 100644 index 0000000..b44bcfc --- /dev/null +++ b/.idea/runConfigurations/pip_install.xml @@ -0,0 +1,17 @@ + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..35eb1dd --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/opendbt/__init__.py b/opendbt/__init__.py new file mode 100644 index 0000000..d68c404 --- /dev/null +++ b/opendbt/__init__.py @@ -0,0 +1,106 @@ +import argparse +import logging +import os +import subprocess +import sys +from pathlib import Path + +from dbt.cli.main import dbtRunnerResult +from dbt.contracts.graph.manifest import Manifest + +import opendbt.client + + +class OpenDbtLogger: + _log = None + + @property + def log(self) -> logging.Logger: + if self._log is None: + self._log = logging.getLogger(name="opendbt") + if not self._log.hasHandlers(): + handler = logging.StreamHandler(sys.stdout) + formatter = logging.Formatter("[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s") + handler.setFormatter(formatter) + handler.setLevel(logging.INFO) + self._log.addHandler(handler) + return self._log + + +class OpenDbtProject(OpenDbtLogger): + """ + This class is used to take action on a dbt project. + """ + + DEFAULT_TARGET = 'dev' # development + + def __init__(self, project_dir: Path, target: str = None, profiles_dir: Path = None, args: list = None): + super().__init__() + self.project_dir: Path = project_dir + self.profiles_dir: Path = profiles_dir + self.target: str = target if target else self.DEFAULT_TARGET + self.args = args if args else [] + + def run(self, command: str = "build", target: str = None, args: list = None, use_subprocess: bool = False, + write_json: bool = False) -> dbtRunnerResult: + + run_args = args if args else [] + run_args += ["--target", target if target else self.target] + run_args += ["--project-dir", self.project_dir.as_posix()] + if self.profiles_dir: + run_args += ["--profiles-dir", self.profiles_dir.as_posix()] + run_args = [command] + run_args + self.args + if write_json: + run_args.remove("--no-write-json") + + if use_subprocess: + Utils.runcommand(command=['opendbt'] + run_args) + return None + else: + self.log.info(f"Running `dbt {' '.join(run_args)}`") + return client.OpenDbtCli.run(args=run_args) + + def manifest(self, partial_parse=True, no_write_manifest=True) -> Manifest: + args = [] + if partial_parse: + args += ["--partial-parse"] + if no_write_manifest: + args += ["--no-write-json"] + + result = self.run(command="parse", args=args) + if isinstance(result.result, Manifest): + return result.result + + raise Exception(f"DBT execution did not return Manifest object. returned:{type(result.result)}") + + def generate_docs(self, args: list = None): + _args = ["generate"] + args if args else [] + self.run(command="docs", args=_args) + + +class Utils(object): + + @staticmethod + def runcommand(command: list, shell=False): + logger = OpenDbtLogger() + + logger.log.info("Working dir is %s" % os.getcwd()) + logger.log.info("Running command (shell=%s) `%s`" % (shell, " ".join(command))) + with subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, bufsize=1, + universal_newlines=True, shell=shell) as p: + for line in p.stdout: + if line: + print(line.strip()) + + if p.returncode != 0: + raise subprocess.CalledProcessError(p.returncode, p.args) + + +def main(): + p = argparse.ArgumentParser() + _, args = p.parse_known_args() + client.OpenDbtCli.run(args=args) + + +if __name__ == "__main__": + main() diff --git a/opendbt/airflow/__init__.py b/opendbt/airflow/__init__.py new file mode 100644 index 0000000..73b5082 --- /dev/null +++ b/opendbt/airflow/__init__.py @@ -0,0 +1,179 @@ +from datetime import timedelta +from pathlib import Path +from typing import Tuple + +from airflow import DAG +from airflow.models.baseoperator import BaseOperator +from airflow.operators.empty import EmptyOperator + +import opendbt + + +class OpenDbtExecutorOperator(BaseOperator): + """ + An Airflow operator for executing dbt commands. + """ + + def __init__(self, + project_dir: Path, + command: str, + target: str = None, + profiles_dir: Path = None, + select: str = None, + args: list = None, + execution_timeout=timedelta(minutes=60), **kwargs) -> None: + super().__init__(execution_timeout=execution_timeout, **kwargs) + + self.project_dir: Path = project_dir + self.command = command + self.profiles_dir: Path = profiles_dir + self.target = target + self.args = args if args else [] + + if select: + self.args += ["--select", select] + + # use separate colour for test and other executions + if self.command == "test": + self.ui_color = "#1CB1C2" + else: + self.ui_color = "#0084ff" + + def execute(self, context): + """ + Execute the dbt command. + """ + runner = opendbt.OpenDbtProject(project_dir=self.project_dir, + profiles_dir=self.profiles_dir, + target=self.target) + runner.run(command=self.command, args=self.args, use_subprocess=True) + + +class OpenDbtAirflowProject(opendbt.OpenDbtProject): + + def load_dbt_tasks(self, + dag: DAG, + start_node: BaseOperator = None, + end_node: BaseOperator = None, + tag: str = None, + resource_type="all", + run_dbt_seeds=False, + run_singular_tests=False) -> Tuple[BaseOperator, BaseOperator]: + """ + This method is used to add dbt tasks to Given DAG. + + Parameters: + dag (DAG): The Airflow DAG object where the dbt tasks will be added. + start_node (BaseOperator, optional): The starting node of the DAG. If not provided, an EmptyOperator will be used. + end_node (BaseOperator, optional): The ending node of the DAG. If not provided, an EmptyOperator will be used. + tag (str, optional): The tag to filter the dbt tasks. If provided, only tasks with this tag will be added to the DAG. + resource_type (str, optional): The type of dbt resource to run. It can be "all", "model", or "test". Default is "all". + run_dbt_seeds (bool, optional): A flag to indicate whether to run dbt seeds before all other dbt jobs. Default is False. + + Returns: + Tuple[BaseOperator, BaseOperator]: The start and end nodes of the DAG after adding the dbt tasks. + """ + + start_node = start_node if start_node else EmptyOperator(task_id='dbt-%s-start' % self.project_dir.name, + dag=dag) + end_node = end_node if end_node else EmptyOperator(task_id='dbt-%s-end' % self.project_dir.name, dag=dag) + + if run_dbt_seeds: + # add dbt seeds job after start node abd before all other dbt jobs + first_node = start_node + start_node = OpenDbtExecutorOperator(dag=dag, + task_id="dbt-seeds", + project_dir=self.project_dir, + profiles_dir=self.profiles_dir, + target=self.target, + command="seed" + ) + start_node.set_upstream(first_node) + + manifest = self.manifest() + dbt_tasks = {} + # create all the jobs. granular as one job per model/table + for key, node in manifest.nodes.items(): + if tag and tag not in node.tags: + self.log.debug( + f"Skipping node:{node.name} because it dont have desired desired-tag={tag} node-tags={node.tags}") + # LOG DEBUG OR TRACE here print(f" tag:{tag} NOT in {node.tags} SKIPP {node.name}") + continue # skip if the node don't have the desired tag + + if resource_type == "test" and not str(node.name).startswith("source_"): + if node.resource_type == "test": + dbt_tasks[node.unique_id] = OpenDbtExecutorOperator(dag=dag, + task_id=node.unique_id.rsplit('.', 1)[0], + project_dir=self.project_dir, + profiles_dir=self.profiles_dir, + target=self.target, + command="test", + select=node.name + ) + if node.resource_type == "model": + dbt_tasks[node.unique_id] = EmptyOperator(dag=dag, task_id=node.unique_id) + + if node.resource_type == "model" and resource_type in ["all", "model"]: + # NOTE `build` command also runs the tests that's why are skipping tests for models below + dbt_tasks[node.unique_id] = OpenDbtExecutorOperator(dag=dag, + task_id=node.unique_id, + project_dir=self.project_dir, + profiles_dir=self.profiles_dir, + target=self.target, + command="build", + select=node.alias + ) + + if node.resource_type == "test" and str(node.name).startswith("source_") and resource_type in ["all", + "test"]: + # we are skipping model tests because they are included above with model execution( `build` command) + # source table tests + dbt_tasks[node.unique_id] = OpenDbtExecutorOperator(dag=dag, + task_id=node.unique_id.rsplit('.', 1)[0], + project_dir=self.project_dir, + profiles_dir=self.profiles_dir, + target=self.target, + command="test", + select=node.name + ) + + # set upstream dependencies using dbt dependencies + for key, node in manifest.nodes.items(): + if tag and tag not in node.tags: + continue # skip if the node don't have the desired tag + if node.unique_id in dbt_tasks: # node.resource_type == "model" or True or + task = dbt_tasks[node.unique_id] + if node.depends_on_nodes: + for upstream_id in node.depends_on_nodes: + if upstream_id in dbt_tasks: + self.log.debug(f"Setting upstream of {task.task_id} -> {upstream_id}") + task.set_upstream(dbt_tasks[upstream_id]) + + singular_tests = None + if run_singular_tests: + singular_tests = OpenDbtExecutorOperator(dag=dag, + task_id=f"{self.project_dir.name}_singular_tests", + project_dir=self.project_dir, + profiles_dir=self.profiles_dir, + target=self.target, + command="test", + select="test_type:singular" + ) + for k, task in dbt_tasks.items(): + if not task.downstream_task_ids: + # set downstream dependencies for the end nodes. + self.log.debug(f"Setting downstream of {task.task_id} -> {end_node.task_id}") + + if run_singular_tests and singular_tests: + task.set_downstream(singular_tests) + else: + task.set_downstream(end_node) + + if not task.upstream_task_ids: + # set upstream dependencies for the nodes which don't have upstream dependency + self.log.debug(f"Setting upstream of {task.task_id} -> {start_node}") + task.set_upstream(start_node) + + if run_singular_tests: + singular_tests.set_downstream(end_node) + return start_node, end_node diff --git a/opendbt/client.py b/opendbt/client.py new file mode 100644 index 0000000..9a837e5 --- /dev/null +++ b/opendbt/client.py @@ -0,0 +1,118 @@ +import importlib +from multiprocessing.context import SpawnContext +from typing import Optional + +import dbt +from dbt.adapters.base.plugin import AdapterPlugin +from dbt.adapters.contracts.connection import AdapterRequiredConfig +from dbt.adapters.events.types import ( + AdapterRegistered, +) +from dbt.adapters.factory import FACTORY, Adapter +from dbt.cli.main import dbtRunner as DbtCliRunner +from dbt.cli.main import dbtRunnerResult +from dbt.contracts.results import RunResult +from dbt.exceptions import DbtRuntimeError +from dbt_common.events.base_types import EventLevel +from dbt_common.events.functions import fire_event + +DBT_CUSTOM_ADAPTER_VAR = 'dbt_custom_adapter' + + +def get_custom_adapter_config_value(self, config: AdapterRequiredConfig) -> str: + # FIRST: it's set as cli value: dbt run --vars {'dbt_custom_adapter': 'custom_adapters.DuckDBAdapterV1Custom'} + if hasattr(config, 'cli_vars') and DBT_CUSTOM_ADAPTER_VAR in config.cli_vars: + custom_adapter_class_name: str = config.cli_vars[DBT_CUSTOM_ADAPTER_VAR] + if custom_adapter_class_name and custom_adapter_class_name.strip(): + return custom_adapter_class_name + # SECOND: it's set inside dbt_project.yml + if hasattr(config, 'vars') and DBT_CUSTOM_ADAPTER_VAR in config.vars.to_dict(): + custom_adapter_class_name: str = config.vars.to_dict()[DBT_CUSTOM_ADAPTER_VAR] + if custom_adapter_class_name and custom_adapter_class_name.strip(): + return custom_adapter_class_name + + return None + + +def get_custom_adapter_class_by_name(self, custom_adapter_class_name: str): + if "." not in custom_adapter_class_name: + raise ValueError(f"Unexpected adapter class name: `{custom_adapter_class_name}` ," + f"Expecting something like:`my.sample.library.MyAdapterClass`") + + __module, __class = custom_adapter_class_name.rsplit('.', 1) + try: + user_adapter_module = importlib.import_module(__module) + user_adapter_class = getattr(user_adapter_module, __class) + return user_adapter_class + except ModuleNotFoundError as mnfe: + raise Exception(f"Module of provided adapter not found, provided: {custom_adapter_class_name}") from mnfe + + +# ================================================================================================================ +# Add further extension below, extend dbt using Monkey Patching! +# ================================================================================================================ +def register_adapter_v2( + self, + config: AdapterRequiredConfig, + mp_context: SpawnContext, + adapter_registered_log_level: Optional[EventLevel] = EventLevel.INFO, +) -> None: + adapter_name = config.credentials.type + adapter_type = self.get_adapter_class_by_name(adapter_name) + adapter_version = self._adapter_version(adapter_name) + # ==== CUSTOM CODE ==== + custom_adapter_class_name: str = self.get_custom_adapter_config_value(config) + if custom_adapter_class_name and custom_adapter_class_name.strip(): + # OVERRIDE DEFAULT ADAPTER BY USER GIVEN ADAPTER CLASS + adapter_type = self.get_custom_adapter_class_by_name(custom_adapter_class_name) + # ==== END CUSTOM CODE ==== + fire_event( + AdapterRegistered(adapter_name=adapter_name, adapter_version=adapter_version), + level=adapter_registered_log_level, + ) + with self.lock: + if adapter_name in self.adapters: + # this shouldn't really happen... + return + + adapter: Adapter = adapter_type(config, mp_context) # type: ignore + self.adapters[adapter_name] = adapter + + +# ================================================================================================================ +# Monkey Patching! Override dbt lib AdapterContainer.register_adapter method with new one above +# ================================================================================================================ +# dbt.adapters.factory.AdapterContainer.get_adapter_class_by_name = get_adapter_class_by_name +dbt.adapters.factory.AdapterContainer.get_custom_adapter_config_value = get_custom_adapter_config_value +dbt.adapters.factory.AdapterContainer.get_custom_adapter_class_by_name = get_custom_adapter_class_by_name +dbt.adapters.factory.AdapterContainer.register_adapter = register_adapter_v2 + + +class OpenDbtCli: + + @staticmethod + def run(args: list) -> dbtRunnerResult: + """ + Run dbt with the given arguments. + + :param args: The arguments to pass to dbt. + :return: The result of the dbt run. + """ + # https://docs.getdbt.com/reference/programmatic-invocations + dbt = DbtCliRunner() + result: dbtRunnerResult = dbt.invoke(args) + if result.success: + return result + + # print query for user to run and see the failing rows + rer: RunResult + + _exception = result.exception if result.exception else None + if (_exception is None and result.result and result.result.results and + len(result.result.results) > 0 and result.result.results[0].message + ): + _exception = DbtRuntimeError(result.result.results[0].message) + + if _exception is None: + DbtRuntimeError(f"DBT execution failed!") + raise _exception diff --git a/opendbt/macros/execute.sql b/opendbt/macros/execute.sql new file mode 100644 index 0000000..4a6e6c7 --- /dev/null +++ b/opendbt/macros/execute.sql @@ -0,0 +1,42 @@ +{% materialization execute, supported_languages=['sql']%} + +{# + modified version of table materialization. it executes compiled sql statement as is. +#} + + {%- set identifier = model['alias'] -%} + {%- set language = model['language'] -%} + + {% set grant_config = config.get('grants') %} + + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set target_relation = api.Relation.create(identifier=identifier, + schema=schema, + database=database, type='table') -%} + + {{ run_hooks(pre_hooks) }} + + {{ log(msg="Executing SQL: " ~ compiled_code ~ "", info=True) }} + {% call statement('main', language=language, fetch_result=False) -%} + {{ compiled_code }} + {%- endcall %} + + {%- set result = load_result('main') -%} + {{ log(msg="Execution result " ~ result ~ "", info=True) }} + {# DISABLED + {%- set result_data = result['data'] -%} + {{ log(msg="Execution result_data " ~ result_data ~ "", info=True) }} + {%- set result_status = result['response'] -%} + {{ log(msg="Execution result_status " ~ result_status ~ "", info=True) }} + END-DISABLED #} + + {{ run_hooks(post_hooks) }} + + {% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} diff --git a/opendbt/macros/executepython.sql b/opendbt/macros/executepython.sql new file mode 100644 index 0000000..58159f8 --- /dev/null +++ b/opendbt/macros/executepython.sql @@ -0,0 +1,26 @@ +{% materialization executepython, supported_languages=['python']%} + + {%- set identifier = model['alias'] -%} + {%- set language = model['language'] -%} + + {% set grant_config = config.get('grants') %} + + {%- set old_relation = adapter.get_relation(database=database, schema=schema, identifier=identifier) -%} + {%- set target_relation = api.Relation.create(identifier=identifier, + schema=schema, + database=database, type='table') -%} + {{ run_hooks(pre_hooks) }} + + {% call noop_statement(name='main', message='Executed Python', code=compiled_code, rows_affected=-1, res=None) %} + {%- set res = adapter.submit_local_python_job(model, compiled_code) -%} + {% endcall %} + {{ run_hooks(post_hooks) }} + + {% set should_revoke = should_revoke(old_relation, full_refresh_mode=True) %} + {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} + + {% do persist_docs(target_relation, model) %} + + {{ return({'relations': [target_relation]}) }} + +{% endmaterialization %} diff --git a/setup.py b/setup.py new file mode 100644 index 0000000..ee3a2b2 --- /dev/null +++ b/setup.py @@ -0,0 +1,30 @@ +import os +import pathlib + +from setuptools import setup, find_packages + +setup_py_dir = pathlib.Path(__file__).parent +os.chdir(setup_py_dir) + +setup( + name='opendbt', + entry_points={ + 'console_scripts': [ + 'opendbt = opendbt:main', + ], + }, + version='2.2.0', + packages=find_packages(), + author="Memiiso Organization", + description='Python opendbt', + long_description=pathlib.Path(__file__).parent.joinpath("README.md").read_text(encoding="utf-8"), + long_description_content_type="text/markdown", + url='https://github.com/memiiso/opendbt', + download_url='https://github.com/memiiso/opendbt/archive/master.zip', + include_package_data=True, + license="Apache License 2.0", + test_suite='tests', + # @TODO make airflow optional dependency + install_requires=["dbt-core", "apache-airflow", "dbt-duckdb", "testcontainers~=3.7.1"], + python_requires='>=3.8' +) diff --git a/tests/custom_adapters.py b/tests/custom_adapters.py new file mode 100644 index 0000000..e6503a9 --- /dev/null +++ b/tests/custom_adapters.py @@ -0,0 +1,36 @@ +import tempfile +from multiprocessing.context import SpawnContext +from typing import Dict + +from dbt.adapters.base import available +from dbt.adapters.duckdb import DuckDBAdapter + +from opendbt import Utils + + +class DuckDBAdapterV1Custom(DuckDBAdapter): + def __init__(self, config, mp_context: SpawnContext) -> None: + print(f"WARNING: Using User Provided DBT Adapter: {type(self).__module__}.{type(self).__name__}") + super().__init__(config=config, mp_context=mp_context) + raise Exception("Custom user defined test adapter activated, exception") + + +class DuckDBAdapterV2Custom(DuckDBAdapter): + def __init__(self, config, mp_context: SpawnContext) -> None: + print(f"WARNING: Using User Provided DBT Adapter: {type(self).__module__}.{type(self).__name__}") + super().__init__(config=config, mp_context=mp_context) + + @available + def submit_local_python_job(self, parsed_model: Dict, compiled_code: str): + model_unique_id = parsed_model.get('unique_id') + __py_code = f""" +{compiled_code} + +# NOTE this is local python execution so session is None +model(dbt=dbtObj(None), session=None) + """ + with tempfile.NamedTemporaryFile(suffix=f'__{model_unique_id}.py', delete=False) as fp: + fp.write(__py_code.encode('utf-8')) + fp.close() + print(f"Created temp py file {fp.name}") + Utils.runcommand(command=['python', fp.name]) diff --git a/tests/resources/airflow/Dockerfile b/tests/resources/airflow/Dockerfile new file mode 100644 index 0000000..6af7107 --- /dev/null +++ b/tests/resources/airflow/Dockerfile @@ -0,0 +1,12 @@ +FROM apache/airflow:slim-2.5.2-python3.8 +LABEL authors="opendbt" + +# install additional packages +COPY --chown=airflow:airflow opendbt /tmp/opendbt/opendbt +COPY --chown=airflow:airflow README.md /tmp/opendbt/README.md +COPY --chown=airflow:airflow setup.py /tmp/opendbt/setup.py +COPY --chown=airflow:airflow tests/resources/dbttest /opt/dbttest +COPY --chown=airflow:airflow opendbt/macros /opt/dbttest/macros + +RUN pip install -e /tmp/opendbt/ +EXPOSE 8080 diff --git a/tests/resources/airflow/airflow/airflow.cfg b/tests/resources/airflow/airflow/airflow.cfg new file mode 100644 index 0000000..2b6fcc7 --- /dev/null +++ b/tests/resources/airflow/airflow/airflow.cfg @@ -0,0 +1,308 @@ +# Default airflow config of the docker image +[core] +dags_folder = /opt/airflow/dags +hostname_callable = airflow.utils.net.getfqdn +default_timezone = utc +executor = SequentialExecutor +parallelism = 32 +max_active_tasks_per_dag = 16 +dags_are_paused_at_creation = True +max_active_runs_per_dag = 16 +load_examples = True +plugins_folder = /opt/airflow/plugins +execute_tasks_new_python_interpreter = False +fernet_key = +donot_pickle = True +dagbag_import_timeout = 30.0 +dagbag_import_error_tracebacks = True +dagbag_import_error_traceback_depth = 2 +dag_file_processor_timeout = 50 +task_runner = StandardTaskRunner +default_impersonation = +security = +unit_test_mode = False +enable_xcom_pickling = False +allowed_deserialization_classes = airflow\..* +killed_task_cleanup_time = 60 +dag_run_conf_overrides_params = True +dag_discovery_safe_mode = True +dag_ignore_file_syntax = regexp +default_task_retries = 0 +default_task_retry_delay = 300 +default_task_weight_rule = downstream +default_task_execution_timeout = +min_serialized_dag_update_interval = 30 +compress_serialized_dags = False +min_serialized_dag_fetch_interval = 10 +max_num_rendered_ti_fields_per_task = 30 +check_slas = True +xcom_backend = airflow.models.xcom.BaseXCom +lazy_load_plugins = True +lazy_discover_providers = True +hide_sensitive_var_conn_fields = True +sensitive_var_conn_names = +default_pool_task_slot_count = 128 +max_map_length = 1024 +daemon_umask = 0o077 +[database] +sql_alchemy_conn = sqlite:////opt/airflow/airflow.db +sql_engine_encoding = utf-8 +sql_alchemy_pool_enabled = True +sql_alchemy_pool_size = 5 +sql_alchemy_max_overflow = 10 +sql_alchemy_pool_recycle = 1800 +sql_alchemy_pool_pre_ping = True +sql_alchemy_schema = +load_default_connections = True +max_db_retries = 3 +[logging] +base_log_folder = /opt/airflow/logs +remote_logging = False +remote_log_conn_id = +google_key_path = +remote_base_log_folder = +encrypt_s3_logs = False +logging_level = INFO +celery_logging_level = +fab_logging_level = WARNING +logging_config_class = +colored_console_log = True +colored_log_format = [%%(blue)s%%(asctime)s%%(reset)s] {%%(blue)s%%(filename)s:%%(reset)s%%(lineno)d} %%(log_color)s%%(levelname)s%%(reset)s - %%(log_color)s%%(message)s%%(reset)s +colored_formatter_class = airflow.utils.log.colored_log.CustomTTYColoredFormatter +log_format = [%%(asctime)s] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s +simple_log_format = %%(asctime)s %%(levelname)s - %%(message)s +dag_processor_log_target = file +dag_processor_log_format = [%%(asctime)s] [SOURCE:DAG_PROCESSOR] {%%(filename)s:%%(lineno)d} %%(levelname)s - %%(message)s +log_formatter_class = airflow.utils.log.timezone_aware.TimezoneAware +task_log_prefix_template = +log_filename_template = dag_id={{ ti.dag_id }}/run_id={{ ti.run_id }}/task_id={{ ti.task_id }}/{%% if ti.map_index >= 0 %%}map_index={{ ti.map_index }}/{%% endif %%}attempt={{ try_number }}.log +log_processor_filename_template = {{ filename }}.log +dag_processor_manager_log_location = /opt/airflow/logs/dag_processor_manager/dag_processor_manager.log +task_log_reader = task +extra_logger_names = +worker_log_server_port = 8793 +[metrics] +statsd_on = False +statsd_host = localhost +statsd_port = 8125 +statsd_prefix = airflow +statsd_allow_list = +stat_name_handler = +statsd_datadog_enabled = False +statsd_datadog_tags = + +[secrets] +# backend = airflow.providers.hashicorp.secrets.vault.VaultBackend +# backend_kwargs = {"connections_path": "connections", "variables_path": "variables", "mount_point": "airflow", "url": "http://127.0.0.1:8200"} +backend = +backend_kwargs = + +[cli] +api_client = airflow.api.client.local_client +endpoint_url = http://localhost:8080 +[debug] +fail_fast = False +[api] +enable_experimental_api = False +auth_backends = airflow.api.auth.backend.session +maximum_page_limit = 100 +fallback_page_limit = 100 +google_oauth2_audience = +google_key_path = +access_control_allow_headers = +access_control_allow_methods = +access_control_allow_origins = +[lineage] +backend = +[atlas] +sasl_enabled = False +host = +port = 21000 +username = +password = +[operators] +default_owner = airflow +default_cpus = 1 +default_ram = 512 +default_disk = 512 +default_gpus = 0 +default_queue = default +allow_illegal_arguments = False +[hive] +default_hive_mapred_queue = +[webserver] +base_url = http://localhost:8080 +default_ui_timezone = UTC +web_server_host = 0.0.0.0 +web_server_port = 8080 +web_server_ssl_cert = +web_server_ssl_key = +session_backend = database +web_server_master_timeout = 120 +web_server_worker_timeout = 120 +worker_refresh_batch_size = 1 +worker_refresh_interval = 6000 +reload_on_plugin_change = False +secret_key = KpWSnDmjuxdEAVePCn1T4Q== +workers = 4 +worker_class = sync +access_logfile = - +error_logfile = - +access_logformat = +expose_config = False +expose_hostname = False +expose_stacktrace = False +dag_default_view = grid +dag_orientation = LR +log_fetch_timeout_sec = 5 +log_fetch_delay_sec = 2 +log_auto_tailing_offset = 30 +log_animation_speed = 1000 +hide_paused_dags_by_default = False +page_size = 100 +navbar_color = #fff +default_dag_run_display_number = 25 +enable_proxy_fix = False +proxy_fix_x_for = 1 +proxy_fix_x_proto = 1 +proxy_fix_x_host = 1 +proxy_fix_x_port = 1 +proxy_fix_x_prefix = 1 +cookie_secure = False +cookie_samesite = Lax +default_wrap = False +x_frame_enabled = True +show_recent_stats_for_completed_runs = True +update_fab_perms = True +session_lifetime_minutes = 43200 +instance_name_has_markup = False +auto_refresh_interval = 3 +warn_deployment_exposure = True +audit_view_excluded_events = gantt,landing_times,tries,duration,calendar,graph,grid,tree,tree_data +[email] +email_backend = airflow.utils.email.send_email_smtp +email_conn_id = smtp_default +default_email_on_retry = True +default_email_on_failure = True +[smtp] +smtp_host = localhost +smtp_starttls = True +smtp_ssl = False +smtp_port = 25 +smtp_mail_from = airflow@example.com +smtp_timeout = 30 +smtp_retry_limit = 5 +[sentry] +sentry_on = false +sentry_dsn = +[local_kubernetes_executor] +kubernetes_queue = kubernetes +[celery_kubernetes_executor] +kubernetes_queue = kubernetes +[celery] +celery_app_name = airflow.executors.celery_executor +worker_concurrency = 16 +worker_prefetch_multiplier = 1 +worker_enable_remote_control = true +broker_url = redis://redis:6379/0 +flower_host = 0.0.0.0 +flower_url_prefix = +flower_port = 5555 +flower_basic_auth = +sync_parallelism = 0 +celery_config_options = airflow.config_templates.default_celery.DEFAULT_CELERY_CONFIG +ssl_active = False +ssl_key = +ssl_cert = +ssl_cacert = +pool = prefork +operation_timeout = 1.0 +task_track_started = True +task_adoption_timeout = 600 +stalled_task_timeout = 0 +task_publish_max_retries = 3 +worker_precheck = False +[celery_broker_transport_options] +[dask] +cluster_address = 127.0.0.1:8786 +tls_ca = +tls_cert = +tls_key = +[scheduler] +job_heartbeat_sec = 5 +scheduler_heartbeat_sec = 5 +num_runs = -1 +scheduler_idle_sleep_time = 1 +min_file_process_interval = 30 +parsing_cleanup_interval = 60 +dag_dir_list_interval = 300 +print_stats_interval = 30 +pool_metrics_interval = 5.0 +scheduler_health_check_threshold = 30 +enable_health_check = False +scheduler_health_check_server_port = 8974 +orphaned_tasks_check_interval = 300.0 +child_process_log_directory = /opt/airflow/logs/scheduler +scheduler_zombie_task_threshold = 300 +zombie_detection_interval = 10.0 +catchup_by_default = True +ignore_first_depends_on_past_by_default = True +max_tis_per_query = 512 +use_row_level_locking = True +max_dagruns_to_create_per_loop = 10 +max_dagruns_per_loop_to_schedule = 20 +schedule_after_task_execution = True +parsing_processes = 2 +file_parsing_sort_mode = modified_time +standalone_dag_processor = False +max_callbacks_per_loop = 20 +dag_stale_not_seen_duration = 600 +use_job_schedule = True +allow_trigger_in_future = False +trigger_timeout_check_interval = 15 +[triggerer] +default_capacity = 1000 +[kerberos] +ccache = /tmp/airflow_krb5_ccache +principal = airflow +reinit_frequency = 3600 +kinit_path = kinit +keytab = airflow.keytab +forwardable = True +include_ip = True +[elasticsearch] +host = +log_id_template = {dag_id}-{task_id}-{run_id}-{map_index}-{try_number} +end_of_log_mark = end_of_log +frontend = +write_stdout = False +json_format = False +json_fields = asctime, filename, lineno, levelname, message +host_field = host +offset_field = offset +[elasticsearch_configs] +use_ssl = False +verify_certs = True +[kubernetes_executor] +pod_template_file = +worker_container_repository = +worker_container_tag = +namespace = default +delete_worker_pods = True +delete_worker_pods_on_failure = False +worker_pods_creation_batch_size = 1 +multi_namespace_mode = False +in_cluster = True +kube_client_request_args = +delete_option_kwargs = +enable_tcp_keepalive = True +tcp_keep_idle = 120 +tcp_keep_intvl = 30 +tcp_keep_cnt = 6 +verify_ssl = True +worker_pods_pending_timeout = 300 +worker_pods_pending_timeout_check_interval = 120 +worker_pods_queued_check_interval = 60 +worker_pods_pending_timeout_batch_size = 100 +[sensors] +default_timeout = 604800 diff --git a/tests/resources/airflow/airflow/webserver_config.py b/tests/resources/airflow/airflow/webserver_config.py new file mode 100644 index 0000000..76c8687 --- /dev/null +++ b/tests/resources/airflow/airflow/webserver_config.py @@ -0,0 +1,114 @@ +"""Default configuration for the Airflow webserver.""" +from __future__ import annotations + +import os + +from flask_appbuilder.const import AUTH_DB + +# from airflow.www.fab_security.manager import AUTH_LDAP +# from airflow.www.fab_security.manager import AUTH_OAUTH +# from airflow.www.fab_security.manager import AUTH_OID +# from airflow.www.fab_security.manager import AUTH_REMOTE_USER + + +basedir = os.path.abspath(os.path.dirname(__file__)) + +# Flask-WTF flag for CSRF +WTF_CSRF_ENABLED = True +WTF_CSRF_TIME_LIMIT = None + +# ---------------------------------------------------- +# AUTHENTICATION CONFIG +# ---------------------------------------------------- +# For details on how to set up each of the following authentication, see +# http://flask-appbuilder.readthedocs.io/en/latest/security.html# authentication-methods +# for details. + +# The authentication type +# AUTH_OID : Is for OpenID +# AUTH_DB : Is for database +# AUTH_LDAP : Is for LDAP +# AUTH_REMOTE_USER : Is for using REMOTE_USER from web server +# AUTH_OAUTH : Is for OAuth +AUTH_TYPE = AUTH_DB + +# Uncomment to setup Full admin role name +# AUTH_ROLE_ADMIN = 'Admin' + +# Uncomment and set to desired role to enable access without authentication +AUTH_ROLE_PUBLIC = 'Admin' + +# Will allow user self registration +# AUTH_USER_REGISTRATION = True + +# The recaptcha it's automatically enabled for user self registration is active and the keys are necessary +# RECAPTCHA_PRIVATE_KEY = PRIVATE_KEY +# RECAPTCHA_PUBLIC_KEY = PUBLIC_KEY + +# Config for Flask-Mail necessary for user self registration +# MAIL_SERVER = 'smtp.gmail.com' +# MAIL_USE_TLS = True +# MAIL_USERNAME = 'yourappemail@gmail.com' +# MAIL_PASSWORD = 'passwordformail' +# MAIL_DEFAULT_SENDER = 'sender@gmail.com' + +# The default user self registration role +# AUTH_USER_REGISTRATION_ROLE = "Public" + +# When using OAuth Auth, uncomment to setup provider(s) info +# Google OAuth example: +# OAUTH_PROVIDERS = [{ +# 'name':'google', +# 'token_key':'access_token', +# 'icon':'fa-google', +# 'remote_app': { +# 'api_base_url':'https://www.googleapis.com/oauth2/v2/', +# 'client_kwargs':{ +# 'scope': 'email profile' +# }, +# 'access_token_url':'https://accounts.google.com/o/oauth2/token', +# 'authorize_url':'https://accounts.google.com/o/oauth2/auth', +# 'request_token_url': None, +# 'client_id': GOOGLE_KEY, +# 'client_secret': GOOGLE_SECRET_KEY, +# } +# }] + +# When using LDAP Auth, setup the ldap server +# AUTH_LDAP_SERVER = "ldap://ldapserver.new" + +# When using OpenID Auth, uncomment to setup OpenID providers. +# example for OpenID authentication +# OPENID_PROVIDERS = [ +# { 'name': 'Yahoo', 'url': 'https://me.yahoo.com' }, +# { 'name': 'AOL', 'url': 'http://openid.aol.com/' }, +# { 'name': 'Flickr', 'url': 'http://www.flickr.com/' }, +# { 'name': 'MyOpenID', 'url': 'https://www.myopenid.com' }] + +# ---------------------------------------------------- +# Theme CONFIG +# ---------------------------------------------------- +# Flask App Builder comes up with a number of predefined themes +# that you can use for Apache Airflow. +# http://flask-appbuilder.readthedocs.io/en/latest/customizing.html#changing-themes +# Please make sure to remove "navbar_color" configuration from airflow.cfg +# in order to fully utilize the theme. (or use that property in conjunction with theme) +# APP_THEME = "bootstrap-theme.css" # default bootstrap +# APP_THEME = "amelia.css" +# APP_THEME = "cerulean.css" +# APP_THEME = "cosmo.css" +# APP_THEME = "cyborg.css" +# APP_THEME = "darkly.css" +# APP_THEME = "flatly.css" +# APP_THEME = "journal.css" +# APP_THEME = "lumen.css" +# APP_THEME = "paper.css" +# APP_THEME = "readable.css" +# APP_THEME = "sandstone.css" +# APP_THEME = "simplex.css" +# APP_THEME = "slate.css" +# APP_THEME = "solar.css" +# APP_THEME = "spacelab.css" +# APP_THEME = "superhero.css" +# APP_THEME = "united.css" +# APP_THEME = "yeti.css" diff --git a/tests/resources/airflow/dags/dbt_workflow.py b/tests/resources/airflow/dags/dbt_workflow.py new file mode 100644 index 0000000..2edf045 --- /dev/null +++ b/tests/resources/airflow/dags/dbt_workflow.py @@ -0,0 +1,32 @@ +from pathlib import Path + +from airflow.operators.empty import EmptyOperator +from airflow.utils.dates import days_ago + +from airflow import DAG +from opendbt.airflow import OpenDbtAirflowProject + +default_args = { + 'owner': 'airflow', + 'depends_on_past': False, + 'email_on_failure': False, + 'email_on_retry': False, + 'retries': 1 +} + +with DAG( + dag_id='dbt_workflow', + default_args=default_args, + description='DAG To run dbt', + schedule_interval=None, + start_date=days_ago(3), + catchup=False, + max_active_runs=1 +) as dag: + start = EmptyOperator(task_id="start") + end = EmptyOperator(task_id="end") + + DBTTEST_DIR = Path("/opt/dbttest") + + p = OpenDbtAirflowProject(project_dir=DBTTEST_DIR, profiles_dir=DBTTEST_DIR, target='dev') + p.load_dbt_tasks(dag=dag, start_node=start, end_node=end) diff --git a/tests/resources/airflow/docker-compose.yaml b/tests/resources/airflow/docker-compose.yaml new file mode 100644 index 0000000..4fb10db --- /dev/null +++ b/tests/resources/airflow/docker-compose.yaml @@ -0,0 +1,22 @@ +version: '2' + +services: + airflow: + build: + dockerfile: tests/resources/airflow/Dockerfile + # NOTE The path can be absolute or relative. + # If it is relative, it is resolved from the Compose file's parent folder. + context: ./../../../ + image: opendbt_airflow + entrypoint: airflow standalone + volumes: + # NOTE The path can be absolute or relative. + - ./airflow/webserver_config.py:/opt/airflow/webserver_config.py + - ./airflow/airflow.cfg:/opt/airflow/airflow.cfg + - ./dags:/opt/airflow/dags:rw +# - ./../../../:/opt/airflow/plugins:rw + environment: + - AIRFLOW__WEBSERVER__INSTANCE_NAME=LOCAL + - AIRFLOW_ENVIRONMENT=LOCAL + ports: + - "8080" diff --git a/tests/resources/dbttest/.gitignore b/tests/resources/dbttest/.gitignore new file mode 100644 index 0000000..49f147c --- /dev/null +++ b/tests/resources/dbttest/.gitignore @@ -0,0 +1,4 @@ + +target/ +dbt_packages/ +logs/ diff --git a/tests/resources/dbttest/README.md b/tests/resources/dbttest/README.md new file mode 100644 index 0000000..1616d3d --- /dev/null +++ b/tests/resources/dbttest/README.md @@ -0,0 +1,3 @@ +### DBT Test Project + +Here you can see an example materialization models \ No newline at end of file diff --git a/tests/resources/dbttest/analyses/.gitkeep b/tests/resources/dbttest/analyses/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/resources/dbttest/dbt_project.yml b/tests/resources/dbttest/dbt_project.yml new file mode 100644 index 0000000..08fd0b0 --- /dev/null +++ b/tests/resources/dbttest/dbt_project.yml @@ -0,0 +1,40 @@ +# Name your project! Project names should contain only lowercase characters +# and underscores. A good package name should reflect your organization's +# name or the intended use of these models +name: 'dbttest' +version: '1.0.0' + +# This setting configures which "profile" dbt uses for this project. +profile: 'dbttest' + +# These configurations specify where dbt should look for different types of files. +# The `model-paths` config, for example, states that models in this project can be +# found in the "models/" directory. You probably won't need to change these! +model-paths: [ "models" ] +analysis-paths: [ "analyses" ] +test-paths: [ "tests" ] +seed-paths: [ "seeds" ] +# include "opendbt/macros/" macros! +macro-paths: [ "macros", "../../../opendbt/macros/" ] +snapshot-paths: [ "snapshots" ] + +clean-targets: # directories to be removed by `dbt clean` + - "target" + - "dbt_packages" + + +# Configuring models +# Full documentation: https://docs.getdbt.com/docs/configuring-models + +# In this example config, we tell dbt to build all models in the example/ +# directory as views. These settings can be overridden in the individual model +# files using the `{{ config(...) }}` macro. +models: + dbttest: + # Config indicated by + and applies to all files under models/example/ + example: + +materialized: view + +vars: + my_custom_aaaaa: "XXX" # WORKS + dbt_custom_adapter: null \ No newline at end of file diff --git a/tests/resources/dbttest/macros/.gitkeep b/tests/resources/dbttest/macros/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/resources/dbttest/models/example/my_first_dbt_model.sql b/tests/resources/dbttest/models/example/my_first_dbt_model.sql new file mode 100644 index 0000000..f31a12d --- /dev/null +++ b/tests/resources/dbttest/models/example/my_first_dbt_model.sql @@ -0,0 +1,27 @@ + +/* + Welcome to your first dbt model! + Did you know that you can also configure models directly within SQL files? + This will override configurations stated in dbt_project.yml + + Try changing "table" to "view" below +*/ + +{{ config(materialized='table') }} + +with source_data as ( + + select 1 as id + union all + select null as id + +) + +select * +from source_data + +/* + Uncomment the line below to remove records with null `id` values +*/ + +-- where id is not null diff --git a/tests/resources/dbttest/models/example/my_second_dbt_model.sql b/tests/resources/dbttest/models/example/my_second_dbt_model.sql new file mode 100644 index 0000000..c91f879 --- /dev/null +++ b/tests/resources/dbttest/models/example/my_second_dbt_model.sql @@ -0,0 +1,6 @@ + +-- Use the `ref` function to select from other models + +select * +from {{ ref('my_first_dbt_model') }} +where id = 1 diff --git a/tests/resources/dbttest/models/example/schema.yml b/tests/resources/dbttest/models/example/schema.yml new file mode 100644 index 0000000..9730b70 --- /dev/null +++ b/tests/resources/dbttest/models/example/schema.yml @@ -0,0 +1,21 @@ + +version: 2 + +models: + - name: my_first_dbt_model + description: "A starter dbt model" + columns: + - name: id + description: "The primary key for this table" + data_tests: + - unique + - not_null + + - name: my_second_dbt_model + description: "A starter dbt model" + columns: + - name: id + description: "The primary key for this table" + data_tests: + - unique + - not_null diff --git a/tests/resources/dbttest/models/my_execute_dbt_model.sql b/tests/resources/dbttest/models/my_execute_dbt_model.sql new file mode 100644 index 0000000..31293b2 --- /dev/null +++ b/tests/resources/dbttest/models/my_execute_dbt_model.sql @@ -0,0 +1,7 @@ +{{ config(materialized='execute') }} + + +create or replace table my_execute_dbt_model +as + +select 123 as column1 \ No newline at end of file diff --git a/tests/resources/dbttest/models/my_executepython_dbt_model.py b/tests/resources/dbttest/models/my_executepython_dbt_model.py new file mode 100644 index 0000000..1c724c7 --- /dev/null +++ b/tests/resources/dbttest/models/my_executepython_dbt_model.py @@ -0,0 +1,22 @@ +import os +import platform + +from dbt import version + + +def print_info(): + _str = f"name:{os.name}, system:{platform.system()} release:{platform.release()}" + _str += f"\npython version:{platform.python_version()}, dbt:{version.__version__}" + print(_str) + + +def model(dbt, session): + dbt.config(materialized="executepython") + print("==================================================") + print("========IM LOCALLY EXECUTED PYTHON MODEL==========") + print("==================================================") + print_info() + print("==================================================") + print("===============MAKE DBT GREAT AGAIN===============") + print("==================================================") + return None diff --git a/tests/resources/dbttest/profiles.yml b/tests/resources/dbttest/profiles.yml new file mode 100644 index 0000000..0c438aa --- /dev/null +++ b/tests/resources/dbttest/profiles.yml @@ -0,0 +1,15 @@ +dbttest: + outputs: + dev: + type: duckdb + adapter: my.dbt.custom.OpenAdapterXXX + path: dev.duckdb + threads: 1 + + prod: + type: duckdb + adapter: my.dbt.custom.OpenAdapterXXX + path: prod.duckdb + threads: 4 + + target: dev diff --git a/tests/resources/dbttest/seeds/.gitkeep b/tests/resources/dbttest/seeds/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/resources/dbttest/snapshots/.gitkeep b/tests/resources/dbttest/snapshots/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/resources/dbttest/tests/.gitkeep b/tests/resources/dbttest/tests/.gitkeep new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_airflow.py b/tests/test_airflow.py new file mode 100644 index 0000000..6072b56 --- /dev/null +++ b/tests/test_airflow.py @@ -0,0 +1,60 @@ +import subprocess +import unittest +from pathlib import Path +from time import sleep + +from testcontainers.compose import DockerCompose +from testcontainers.core.waiting_utils import wait_for_logs + + +@unittest.skip("Manual test") +class TestAirflowBase(unittest.TestCase): + """ + Test class used to do airflow tests. + uses airflow docker image and mounts current code into it. + login is disabled all users can access the UI as Admin. Airflow is set up as Public + """ + _compose: DockerCompose = None + resources_dir = Path(__file__).parent.joinpath('resources') + + @classmethod + def setUpClass(cls): + cls._compose = DockerCompose(filepath=cls.resources_dir.joinpath('airflow').as_posix(), + compose_file_name="docker-compose.yaml", + # build=True + ) + cls._compose.stop() + cls._compose.start() + # cls._compose.wait_for(url="http://localhost:8080/health") + wait_for_logs(cls._compose, 'Added Permission menu access on Configurations') + wait_for_logs(cls._compose, 'Added user admin') + + @classmethod + def tearDownClass(cls): + print("Running tearDownClass") + if cls._compose: + cls._compose.stop() + + def __exit__(self, exc_type, exc_val, traceback): + if self._compose: + self._compose.stop() + + def _get_service_port(self, service, port): + port_cmd = self._compose.docker_compose_command() + ["port", service, str(port)] + output = subprocess.check_output(port_cmd, cwd=self._compose.filepath).decode("utf-8") + result = str(output).rstrip().split(":") + if len(result) != 2: + raise Exception(f"Unexpected service info {output}. expecting `host:1234`") + return result[-1] + + def test_start_airflow_local_and_wait(self): + """ + used to deploy the code inside docker airflow locally. UI login is disabled and made public! + useful to run local airflow with the new code changes and check the changes in airflow ui + while its running all the code changes are reflected in airflow after short time. + :return: + """ + print(f"http://localhost:{self._get_service_port('airflow', 8080)}/home") + print(f"http://localhost:{self._get_service_port('airflow', 8080)}/dbtdocsview") + + sleep(99999999) diff --git a/tests/test_custom_adapter.py b/tests/test_custom_adapter.py new file mode 100644 index 0000000..ed1f771 --- /dev/null +++ b/tests/test_custom_adapter.py @@ -0,0 +1,41 @@ +from pathlib import Path +from unittest import TestCase + +from opendbt import OpenDbtProject + + +class TestOpenDbtProject(TestCase): + RESOURCES_DIR = Path(__file__).parent.joinpath("resources") + DBTTEST_DIR = RESOURCES_DIR.joinpath("dbttest") + + def test_run_with_custom_adapter(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR, + args=['--vars', "{'dbt_custom_adapter': 'custom_adapters.DuckDBAdapterV1Custom'}"]) + with self.assertRaises(Exception) as context: + dp.run(command="compile") + print(context.exception) + self.assertTrue("Custom user defined test adapter activated" in str(context.exception)) + + def test_run_with_custom_adapter_mmodule_not_found(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR, + args=['--vars', '{dbt_custom_adapter: not.exits.module.MyDbtTestAdapterV1}'] + ) + with self.assertRaises(Exception) as context: + dp.run(command="compile") + self.assertTrue("Module of provided adapter not found" in str(context.exception)) + + def test_run_with_custom_adapter_class_not_found(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR, + args=['--vars', '{dbt_custom_adapter: test_custom_adapter.NotExistsAdapterClass}'] + ) + with self.assertRaises(Exception) as context: + dp.run(command="compile") + self.assertTrue("as no attribute 'NotExistsAdapterClass'" in str(context.exception)) + + def test_run_with_custom_adapter_wrong_name(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR, + args=['--vars', 'dbt_custom_adapter: test_custom_adapterMyDbtTestAdapterV1'] + ) + with self.assertRaises(Exception) as context: + dp.run(command="compile") + self.assertTrue("Unexpected adapter class name" in str(context.exception)) diff --git a/tests/test_execute_materialization.py b/tests/test_execute_materialization.py new file mode 100644 index 0000000..1ced356 --- /dev/null +++ b/tests/test_execute_materialization.py @@ -0,0 +1,13 @@ +from pathlib import Path +from unittest import TestCase + +from opendbt import OpenDbtProject + + +class TestOpenDbtProject(TestCase): + RESOURCES_DIR = Path(__file__).parent.joinpath("resources") + DBTTEST_DIR = RESOURCES_DIR.joinpath("dbttest") + + def test_run_execute_materialization(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR) + dp.run(command="run", args=['--select', 'my_execute_dbt_model']) diff --git a/tests/test_executepython_materialization.py b/tests/test_executepython_materialization.py new file mode 100644 index 0000000..be51865 --- /dev/null +++ b/tests/test_executepython_materialization.py @@ -0,0 +1,19 @@ +from pathlib import Path +from unittest import TestCase + +from opendbt import OpenDbtProject + + +class TestOpenDbtProject(TestCase): + RESOURCES_DIR = Path(__file__).parent.joinpath("resources") + DBTTEST_DIR = RESOURCES_DIR.joinpath("dbttest") + + def test_run_executepython_materialization(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR, + args=['--vars', 'dbt_custom_adapter: custom_adapters.DuckDBAdapterV2Custom']) + dp.run(command="run", args=['--select', 'my_executepython_dbt_model']) + + def test_run_executepython_materialization_subprocess(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR, + args=['--vars', 'dbt_custom_adapter: custom_adapters.DuckDBAdapterV2Custom']) + dp.run(command="run", args=['--select', 'my_executepython_dbt_model'], use_subprocess=True) diff --git a/tests/test_opendbt_project.py b/tests/test_opendbt_project.py new file mode 100644 index 0000000..c6a39d1 --- /dev/null +++ b/tests/test_opendbt_project.py @@ -0,0 +1,24 @@ +from importlib import import_module +from pathlib import Path +from unittest import TestCase + +import dbt + +from opendbt import OpenDbtProject + + +class TestOpenDbtProject(TestCase): + RESOURCES_DIR = Path(__file__).parent.joinpath("resources") + DBTTEST_DIR = RESOURCES_DIR.joinpath("dbttest") + + def test_run_compile(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR) + dp.run(command="compile") + + def test_run_run(self): + dp = OpenDbtProject(project_dir=self.DBTTEST_DIR, profiles_dir=self.DBTTEST_DIR) + dp.run(command="run", args=['--select', 'my_first_dbt_model+'], use_subprocess=True) + + def test_print_adapter_version(self): + import_module("dbt.adapters.__about__") + print(dbt.adapters.__about__.version)