Skip to content

Commit

Permalink
Move create topic from AzureServiceBusTopicCreateOperator to `Admin…
Browse files Browse the repository at this point in the history
…ClientHook` (apache#45297)

* Move create topic to hook in Azure Service Bus provider

* Fix indenting of documentation
  • Loading branch information
perry2of5 authored Jan 1, 2025
1 parent 7bbda16 commit cda0e9e
Show file tree
Hide file tree
Showing 4 changed files with 151 additions and 35 deletions.
88 changes: 88 additions & 0 deletions providers/src/airflow/providers/microsoft/azure/hooks/asb.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
ServiceBusSender,
)
from azure.servicebus.management import (
AuthorizationRule,
CorrelationRuleFilter,
QueueProperties,
ServiceBusAdministrationClient,
Expand Down Expand Up @@ -194,6 +195,93 @@ def delete_queue(self, queue_name: str) -> None:
with self.get_conn() as service_mgmt_conn:
service_mgmt_conn.delete_queue(queue_name)

def create_topic(
self,
topic_name: str,
azure_service_bus_conn_id: str = "azure_service_bus_default",
default_message_time_to_live: datetime.timedelta | str | None = None,
max_size_in_megabytes: int | None = None,
requires_duplicate_detection: bool | None = None,
duplicate_detection_history_time_window: datetime.timedelta | str | None = None,
enable_batched_operations: bool | None = None,
size_in_bytes: int | None = None,
filtering_messages_before_publishing: bool | None = None,
authorization_rules: list[AuthorizationRule] | None = None,
support_ordering: bool | None = None,
auto_delete_on_idle: datetime.timedelta | str | None = None,
enable_partitioning: bool | None = None,
enable_express: bool | None = None,
user_metadata: str | None = None,
max_message_size_in_kilobytes: int | None = None,
) -> str:
"""
Create a topic by connecting to service Bus Admin client.
:param topic_name: Name of the topic.
:param default_message_time_to_live: ISO 8601 default message time span to live value. This is
the duration after which the message expires, starting from when the message is sent to Service
Bus. This is the default value used when TimeToLive is not set on a message itself.
Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
like "PT300S" is accepted.
:param max_size_in_megabytes: The maximum size of the topic in megabytes, which is the size of
memory allocated for the topic.
:param requires_duplicate_detection: A value indicating if this topic requires duplicate
detection.
:param duplicate_detection_history_time_window: ISO 8601 time span structure that defines the
duration of the duplicate detection history. The default value is 10 minutes.
Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
like "PT300S" is accepted.
:param enable_batched_operations: Value that indicates whether server-side batched operations
are enabled.
:param size_in_bytes: The size of the topic, in bytes.
:param filtering_messages_before_publishing: Filter messages before publishing.
:param authorization_rules: List of Authorization rules for resource.
:param support_ordering: A value that indicates whether the topic supports ordering.
:param auto_delete_on_idle: ISO 8601 time span idle interval after which the topic is
automatically deleted. The minimum duration is 5 minutes.
Input value of either type ~datetime.timedelta or string in ISO 8601 duration format
like "PT300S" is accepted.
:param enable_partitioning: A value that indicates whether the topic is to be partitioned
across multiple message brokers.
:param enable_express: A value that indicates whether Express Entities are enabled. An express
queue holds a message in memory temporarily before writing it to persistent storage.
:param user_metadata: Metadata associated with the topic.
:param max_message_size_in_kilobytes: The maximum size in kilobytes of message payload that
can be accepted by the queue. This feature is only available when using a Premium namespace
and Service Bus API version "2021-05" or higher.
The minimum allowed value is 1024 while the maximum allowed value is 102400. Default value is 1024.
"""
if topic_name is None:
raise TypeError("Topic name cannot be None.")

with self.get_conn() as service_mgmt_conn:
try:
topic_properties = service_mgmt_conn.get_topic(topic_name)
except ResourceNotFoundError:
topic_properties = None
if topic_properties and topic_properties.name == topic_name:
self.log.info("Topic name already exists")
return topic_properties.name
topic = service_mgmt_conn.create_topic(
topic_name=topic_name,
default_message_time_to_live=default_message_time_to_live,
max_size_in_megabytes=max_size_in_megabytes,
requires_duplicate_detection=requires_duplicate_detection,
duplicate_detection_history_time_window=duplicate_detection_history_time_window,
enable_batched_operations=enable_batched_operations,
size_in_bytes=size_in_bytes,
filtering_messages_before_publishing=filtering_messages_before_publishing,
authorization_rules=authorization_rules,
support_ordering=support_ordering,
auto_delete_on_idle=auto_delete_on_idle,
enable_partitioning=enable_partitioning,
enable_express=enable_express,
user_metadata=user_metadata,
max_message_size_in_kilobytes=max_message_size_in_kilobytes,
)
self.log.info("Created Topic %s", topic.name)
return topic.name

def create_subscription(
self,
topic_name: str,
Expand Down
46 changes: 17 additions & 29 deletions providers/src/airflow/providers/microsoft/azure/operators/asb.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
from collections.abc import Sequence
from typing import TYPE_CHECKING, Any, Callable

from azure.core.exceptions import ResourceNotFoundError

from airflow.models import BaseOperator
from airflow.providers.microsoft.azure.hooks.asb import AdminClientHook, MessageHook

Expand Down Expand Up @@ -313,33 +311,23 @@ def execute(self, context: Context) -> str:
# Create the hook
hook = AdminClientHook(azure_service_bus_conn_id=self.azure_service_bus_conn_id)

with hook.get_conn() as service_mgmt_conn:
try:
topic_properties = service_mgmt_conn.get_topic(self.topic_name)
except ResourceNotFoundError:
topic_properties = None
if topic_properties and topic_properties.name == self.topic_name:
self.log.info("Topic name already exists")
return topic_properties.name
topic = service_mgmt_conn.create_topic(
topic_name=self.topic_name,
default_message_time_to_live=self.default_message_time_to_live,
max_size_in_megabytes=self.max_size_in_megabytes,
requires_duplicate_detection=self.requires_duplicate_detection,
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
enable_batched_operations=self.enable_batched_operations,
size_in_bytes=self.size_in_bytes,
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
authorization_rules=self.authorization_rules,
support_ordering=self.support_ordering,
auto_delete_on_idle=self.auto_delete_on_idle,
enable_partitioning=self.enable_partitioning,
enable_express=self.enable_express,
user_metadata=self.user_metadata,
max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
)
self.log.info("Created Topic %s", topic.name)
return topic.name
return hook.create_topic(
topic_name=self.topic_name,
default_message_time_to_live=self.default_message_time_to_live,
max_size_in_megabytes=self.max_size_in_megabytes,
requires_duplicate_detection=self.requires_duplicate_detection,
duplicate_detection_history_time_window=self.duplicate_detection_history_time_window,
enable_batched_operations=self.enable_batched_operations,
size_in_bytes=self.size_in_bytes,
filtering_messages_before_publishing=self.filtering_messages_before_publishing,
authorization_rules=self.authorization_rules,
support_ordering=self.support_ordering,
auto_delete_on_idle=self.auto_delete_on_idle,
enable_partitioning=self.enable_partitioning,
enable_express=self.enable_express,
user_metadata=self.user_metadata,
max_message_size_in_kilobytes=self.max_message_size_in_kilobytes,
)


class AzureServiceBusSubscriptionCreateOperator(BaseOperator):
Expand Down
20 changes: 20 additions & 0 deletions providers/tests/microsoft/azure/hooks/test_asb.py
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,26 @@ def test_delete_queue_exception(self, mock_sb_admin_client):
with pytest.raises(TypeError):
hook.delete_queue(None)

# Test creating a topic using hook method `create_topic`
@mock.patch("azure.servicebus.management.TopicProperties")
@mock.patch(f"{MODULE}.AdminClientHook.get_conn")
def test_create_topic(self, mock_sb_admin_client, mock_topic_properties):
"""
Test `create_topic` hook function with mocking connection, topic properties value and
the azure service bus `create_topic` function
"""
topic_name = "test_topic_name"
mock_topic_properties.name = topic_name
mock_sb_admin_client.return_value.__enter__.return_value.create_topic.return_value = (
mock_topic_properties
)
hook = AdminClientHook(azure_service_bus_conn_id=self.conn_id)
with mock.patch.object(hook.log, "info") as mock_log_info:
hook.create_topic(topic_name)
assert mock_topic_properties.name == topic_name

mock_log_info.assert_called_with("Created Topic %s", topic_name)

# Test creating subscription with topic name and subscription name using hook method `create_subscription`
@mock.patch("azure.servicebus.management.SubscriptionProperties")
@mock.patch(f"{MODULE}.AdminClientHook.get_conn")
Expand Down
32 changes: 26 additions & 6 deletions providers/tests/microsoft/azure/operators/test_asb.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,19 +255,39 @@ def test_init(self):
@mock.patch("azure.servicebus.management.TopicProperties")
def test_create_topic(self, mock_topic_properties, mock_get_conn):
"""
Test AzureServiceBusTopicCreateOperator passed with the topic name
mocking the connection details, hook create_topic function
Test AzureServiceBusSubscriptionCreateOperator passed with the subscription name, topic name
mocking the connection details, hook create_subscription function
"""
print("Wazzup doc")
asb_create_topic = AzureServiceBusTopicCreateOperator(
task_id="asb_create_topic",
topic_name=TOPIC_NAME,
)
mock_topic_properties.name = TOPIC_NAME
mock_get_conn.return_value.__enter__.return_value.create_topic.return_value = mock_topic_properties

with mock.patch.object(asb_create_topic.log, "info") as mock_log_info:
asb_create_topic.execute(None)
mock_log_info.assert_called_with("Created Topic %s", TOPIC_NAME)
# create the topic
created_topic_name = asb_create_topic.execute(None)
# ensure the topic name is returned
assert created_topic_name == TOPIC_NAME
# ensure create_subscription is called with the correct arguments on the connection
mock_get_conn.return_value.__enter__.return_value.create_topic.assert_called_once_with(
topic_name=TOPIC_NAME,
default_message_time_to_live=None,
max_size_in_megabytes=None,
requires_duplicate_detection=None,
duplicate_detection_history_time_window=None,
enable_batched_operations=None,
size_in_bytes=None,
filtering_messages_before_publishing=None,
authorization_rules=None,
support_ordering=None,
auto_delete_on_idle=None,
enable_partitioning=None,
enable_express=None,
user_metadata=None,
max_message_size_in_kilobytes=None,
)
print("Later Gator")

@mock.patch("airflow.providers.microsoft.azure.hooks.asb.AdminClientHook")
def test_create_subscription_exception(self, mock_sb_admin_client):
Expand Down

0 comments on commit cda0e9e

Please sign in to comment.