Skip to content

Commit dabe725

Browse files
authored
Port CI/CD example (#5)
1 parent 147dc7c commit dabe725

File tree

5 files changed

+254
-1
lines changed

5 files changed

+254
-1
lines changed

README.md

+30
Original file line numberDiff line numberDiff line change
@@ -129,6 +129,21 @@ All required information can be found in the web UI of Confluent's Cloud Console
129129

130130
Examples should be runnable after setting all configuration options correctly.
131131

132+
### Table API Playground using Python Interactive Shell
133+
134+
For convenience, the repository also contains an init script for playing around with
135+
Table API in an interactive manner.
136+
137+
1. Run `poetry shell` to start a shell within the poetry virtualenv
138+
139+
2. Point to the `cloud.properties` file: `export FLINK_PROPERTIES=./config/cloud.properties`
140+
141+
3. Start python with `python -i setup_pyshell.py`
142+
143+
4. The `TableEnvironment` is pre-initialized from environment variables and available under `env`.
144+
145+
5. Run your first "Hello world!" using `env.execute_sql("SELECT 'Hello world!'").print()`
146+
132147
## Configuration
133148

134149
The Table API plugin needs a set of configuration options for establishing a connection to Confluent Cloud.
@@ -334,6 +349,21 @@ ConfluentTools.collect_materialized(table)
334349
ConfluentTools.print_materialized(table)
335350
```
336351

352+
### `ConfluentTools.get_statement_name` / `ConfluentTools.stop_statement`
353+
354+
Additional lifecycle methods are available to control statements on Confluent Cloud after they have
355+
been submitted.
356+
357+
```python
358+
# On TableResult object
359+
table_result = env.execute_sql("SELECT * FROM examples.marketplace.customers")
360+
statement_name = ConfluentTools.get_statement_name(table_result)
361+
ConfluentTools.stop_statement(table_result)
362+
363+
# Based on statement name
364+
ConfluentTools.stop_statement_by_name(env, "table-api-2024-03-21-150457-36e0dbb2e366-sql")
365+
```
366+
337367
### Confluent Table Descriptor
338368

339369
A table descriptor for creating tables located in Confluent Cloud programmatically.

flink_table_api_python/examples/table/example_02_unbounded_tables.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
# limitations under the License.
1717
################################################################################
1818

19-
from pyflink.table import (TableEnvironment, DataTypes)
19+
from pyflink.table import TableEnvironment
2020
from pyflink.table.confluent import ConfluentSettings
2121
from pyflink.table.expressions import col, row
2222
from flink_table_api_python.settings import CLOUD_PROPERTIES_PATH
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,202 @@
1+
################################################################################
2+
# Licensed to the Apache Software Foundation (ASF) under one
3+
# or more contributor license agreements. See the NOTICE file
4+
# distributed with this work for additional information
5+
# regarding copyright ownership. The ASF licenses this file
6+
# to you under the Apache License, Version 2.0 (the
7+
# "License"); you may not use this file except in compliance
8+
# with the License. You may obtain a copy of the License at
9+
#
10+
# http://www.apache.org/licenses/LICENSE-2.0
11+
#
12+
# Unless required by applicable law or agreed to in writing, software
13+
# distributed under the License is distributed on an "AS IS" BASIS,
14+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
# See the License for the specific language governing permissions and
16+
# limitations under the License.
17+
################################################################################
18+
19+
import sys
20+
import uuid
21+
from pyflink.table import (TableEnvironment)
22+
from pyflink.table.confluent import ConfluentSettings, ConfluentTools
23+
from pyflink.table.expressions import col, lit, with_all_columns
24+
from flink_table_api_python.settings import CLOUD_PROPERTIES_PATH
25+
26+
# NOTE: This example requires write access to a Kafka cluster. Fill out the
27+
# given variables below with target catalog/database if this is fine for you.
28+
29+
# Fill this with an environment you have write access to
30+
TARGET_CATALOG = ""
31+
32+
# Fill this with a Kafka cluster you have write access to
33+
TARGET_DATABASE = ""
34+
35+
# Fill this with names of the Kafka Topics you want to create
36+
SOURCE_TABLE = "ProductsMock"
37+
TARGET_TABLE = "VendorsPerBrand"
38+
39+
# The following SQL will be tested on a finite subset of data before
40+
# it gets deployed to production.
41+
# In production, it will run on unbounded input.
42+
# The '%s' parameterizes the SQL for testing.
43+
SQL = "SELECT brand, COUNT(*) AS vendors FROM ProductsMock %s GROUP BY brand"
44+
45+
# An example that illustrates how to embed a table program into a CI/CD
46+
# pipeline for continuous testing and rollout.
47+
#
48+
# Because we cannot rely on production data in this example, the program sets
49+
# up some Kafka-backed tables with data during the setup phase.
50+
#
51+
# Afterward, the program can operate in two modes: one for integration testing
52+
# (test phase) and one for deployment (deploy phase).
53+
#
54+
# A CI/CD workflow could execute the following:
55+
#
56+
# poetry run example_08_integration_and_deployment setup
57+
# poetry run example_08_integration_and_deployment test
58+
# poetry run example_08_integration_and_deployment deploy
59+
#
60+
# NOTE: The example submits an unbounded background statement. Make sure
61+
# to stop the statement in the Web UI afterward to clean up resources.
62+
#
63+
# The complete CI/CD workflow performs the following steps:
64+
# - Create Kafka table 'ProductsMock' and 'VendorsPerBrand'.
65+
# - Fill Kafka table 'ProductsMock' with data from marketplace examples table
66+
# 'products'.
67+
# - Test the given SQL on a subset of data in 'ProductsMock' with the help of
68+
# dynamic options.
69+
# - Deploy an unbounded version of the tested SQL that writes into
70+
# 'VendorsPerBrand'.
71+
def run(args=None):
72+
"""Process command line arguments."""
73+
if not args:
74+
args = sys.argv[1:]
75+
76+
if len(args) == 0:
77+
print(
78+
"No mode specified. Possible values are 'setup', 'test', or 'deploy'.")
79+
exit(1)
80+
81+
mode = args[0]
82+
83+
settings = ConfluentSettings.from_file(CLOUD_PROPERTIES_PATH)
84+
env = TableEnvironment.create(settings)
85+
env.use_catalog(TARGET_CATALOG)
86+
env.use_database(TARGET_DATABASE)
87+
88+
if mode == "setup":
89+
_set_up_program(env)
90+
elif mode == "test":
91+
_test_program(env)
92+
elif mode == "deploy":
93+
_deploy_program(env)
94+
else:
95+
print("Unknown mode: " + mode)
96+
exit(1)
97+
98+
99+
# --------------------------------------------------------------------------
100+
# Setup Phase
101+
# --------------------------------------------------------------------------
102+
def _set_up_program(env: TableEnvironment):
103+
print("Running setup...")
104+
105+
print("Creating table..." + SOURCE_TABLE)
106+
# Create a mock table that has exactly the same schema as the example
107+
# `products` table.
108+
# The LIKE clause is very convenient for this task which is why we use SQL
109+
# here. Since we use little data, a bucket of 1 is important to satisfy the
110+
# `scan.bounded.mode` during testing.
111+
env.execute_sql(
112+
"CREATE TABLE IF NOT EXISTS `%s`\n"
113+
"DISTRIBUTED INTO 1 BUCKETS\n"
114+
"LIKE `examples`.`marketplace`.`products` (EXCLUDING OPTIONS)" %
115+
SOURCE_TABLE)
116+
117+
print("Start filling table...")
118+
# Let Flink copy generated data into the mock table. Note that the
119+
# statement is unbounded and submitted as a background statement by default.
120+
pipeline_result = env.from_path("`examples`.`marketplace`.`products`") \
121+
.select(with_all_columns()) \
122+
.execute_insert(SOURCE_TABLE)
123+
124+
print("Waiting for at least 200 elements in table...")
125+
# We start a second Flink statement for monitoring how the copying progresses
126+
count_result = env.from_path(SOURCE_TABLE).select(lit(1).count).execute()
127+
# This waits for the condition to be met:
128+
with count_result.collect() as results:
129+
for row in results:
130+
count = row[0]
131+
if (count >= 200):
132+
print("200 elements reached. Stopping...")
133+
break
134+
135+
# By using a closable iterator, the foreground statement will be stopped
136+
# automatically when the iterator is closed. But the background statement
137+
# still needs a manual stop.
138+
ConfluentTools.stop_statement(pipeline_result)
139+
140+
print("Creating table..." + TARGET_TABLE)
141+
# Create a table for storing the results after deployment.
142+
env.execute_sql(
143+
"CREATE TABLE IF NOT EXISTS `%s` \n"
144+
"(brand STRING, vendors BIGINT, PRIMARY KEY(brand) NOT ENFORCED)\n"
145+
"DISTRIBUTED INTO 1 BUCKETS" % TARGET_TABLE)
146+
147+
148+
# -----------------------------------------------------------------------------
149+
# Test Phase
150+
# -----------------------------------------------------------------------------
151+
def _test_program(env: TableEnvironment):
152+
print("Running test...")
153+
# Dynamic options allow influencing parts of a table scan. In this case, they
154+
# define a range (from start offset '0' to end offset '100') how to read from
155+
# Kafka. Effectively, they make the table bounded. If all tables are finite,
156+
# the statement can terminate. This allows us to run checks on the result.
157+
dynamicOptions = \
158+
"/*+ OPTIONS(\n" \
159+
"'scan.startup.mode' = 'specific-offsets',\n" \
160+
"'scan.startup.specific-offsets' = 'partition: 0, offset: 0',\n" \
161+
"'scan.bounded.mode' = 'specific-offsets',\n" \
162+
"'scan.bounded.specific-offsets' = 'partition: 0, offset: 100'\n" \
163+
") */"
164+
165+
print("Requesting test data...")
166+
result = env.execute_sql(SQL % dynamicOptions)
167+
rows = ConfluentTools.collect_materialized(result)
168+
169+
print("Test data:")
170+
for row in rows:
171+
print(row)
172+
173+
# Use the testing framework of your choice and add checks to verify the
174+
# correctness of the test data
175+
testSuccessful = any(r[0] == "Apple" for r in rows)
176+
177+
if testSuccessful:
178+
print("Success. Ready for deployment.")
179+
else:
180+
print("Test was not successful")
181+
exit(1)
182+
183+
# ----------------------------------------------------------------------------
184+
# Deploy Phase
185+
# ----------------------------------------------------------------------------
186+
def _deploy_program(env: TableEnvironment):
187+
print("Running deploy...")
188+
189+
# It is possible to give a better statement name for deployment but make sure
190+
# that the name is unique within environment and region.
191+
statement_name = "vendors-per-brand-" + str(uuid.uuid4())
192+
env.get_config().set("client.statement-name", statement_name)
193+
194+
# Execute the SQL without dynamic options.
195+
# The result is unbounded and piped into the target table.
196+
result = env.sql_query(SQL % "").execute_insert(TARGET_TABLE)
197+
198+
# The API might add suffixes to manual statement names such as '-sql' or
199+
# '-api'. For the final submitted name, use the provided tools.
200+
finalName = ConfluentTools.get_statement_name(result)
201+
202+
print("Statement has been deployed as: " + finalName)

pyproject.toml

+1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ example_04_creating_tables = "flink_table_api_python.examples.table.example_04_c
1515
example_05_table_pipelines = "flink_table_api_python.examples.table.example_05_table_pipelines:run"
1616
example_06_values_and_data_types = "flink_table_api_python.examples.table.example_06_values_and_data_types:run"
1717
example_07_changelogs = "flink_table_api_python.examples.table.example_07_changelogs:run"
18+
example_08_integration_and_deployment = "flink_table_api_python.examples.table.example_08_integration_and_deployment:run"
1819

1920
[tool.poetry.dependencies]
2021
python = ">= 3.9, < 3.12"

setup_pyshell.py

+20
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
from pyflink.table.confluent import *
2+
from pyflink.table import *
3+
from pyflink.table.expressions import *
4+
5+
settings = ConfluentSettings.from_global_variables()
6+
env = TableEnvironment.create(settings)
7+
8+
print()
9+
print()
10+
print("Welcome to Apache Flink® Table API on Confluent Cloud")
11+
print()
12+
print()
13+
print("A TableEnvironment has been pre-initialized and is available under `env`.")
14+
print()
15+
print("Some inspirations to get started:")
16+
print(" - Say hello: env.execute_sql(\"SELECT 'Hello world!'\").print()")
17+
print(" - List catalogs: env.list_catalogs()")
18+
print(" - Show something fancy: env.from_path(\"examples.marketplace.clicks\").execute().print()")
19+
print()
20+
print()

0 commit comments

Comments
 (0)