Python example: multi-phase calculation
The following example shows a multi-phase transform function that computes the average value on a column of numbers in an input table. It first defines two transform functions, and then defines a factory that creates the phases using them.
See AvgMultiPhaseUDT.py in the examples distribution for the complete code.
Loading and using the example
Create the library and function:
=> CREATE LIBRARY pylib_avg AS '/home/dbadmin/udx/AvgMultiPhaseUDT.py' LANGUAGE 'Python';
CREATE LIBRARY
=> CREATE TRANSFORM FUNCTION myAvg AS NAME 'MyAvgFactory' LIBRARY pylib_avg;
CREATE TRANSFORM FUNCTION
You can then use the function in SELECT statements:
=> CREATE TABLE IF NOT EXISTS numbers(num FLOAT);
CREATE TABLE
=> COPY numbers FROM STDIN delimiter ',';
1
2
3
4
\.
=> SELECT myAvg(num) OVER() FROM numbers;
average | ignored_rows | total_rows
---------+--------------+------------
2.5 | 0 | 4
(1 row)
Setup
All Python UDxs must import the Vertica SDK. This example also imports another library.
import vertica_sdk
import math
Component transform functions
A multi-phase transform function must define two or more TransformFunction subclasses to be used in the phases. This example uses two classes: LocalCalculation, which does calculations on local partitions, and GlobalCalculation, which aggregates the results of all LocalCalculation instances to calculate a final result.
In both functions, the calculation is done in the processPartition() function:
class LocalCalculation(vertica_sdk.TransformFunction):
"""
This class is the first phase and calculates the local values for sum, ignored_rows and total_rows.
"""
def setup(self, server_interface, col_types):
server_interface.log("Setup: Phase0")
self.local_sum = 0.0
self.ignored_rows = 0
self.total_rows = 0
def processPartition(self, server_interface, input, output):
server_interface.log("Process Partition: Phase0")
while True:
self.total_rows += 1
if input.isNull(0) or math.isinf(input.getFloat(0)) or math.isnan(input.getFloat(0)):
# Null, Inf, or Nan is ignored
self.ignored_rows += 1
else:
self.local_sum += input.getFloat(0)
if not input.next():
break
output.setFloat(0, self.local_sum)
output.setInt(1, self.ignored_rows)
output.setInt(2, self.total_rows)
output.next()
class GlobalCalculation(vertica_sdk.TransformFunction):
"""
This class is the second phase and aggregates the values for sum, ignored_rows and total_rows.
"""
def setup(self, server_interface, col_types):
server_interface.log("Setup: Phase1")
self.global_sum = 0.0
self.ignored_rows = 0
self.total_rows = 0
def processPartition(self, server_interface, input, output):
server_interface.log("Process Partition: Phase1")
while True:
self.global_sum += input.getFloat(0)
self.ignored_rows += input.getInt(1)
self.total_rows += input.getInt(2)
if not input.next():
break
average = self.global_sum / (self.total_rows - self.ignored_rows)
output.setFloat(0, average)
output.setInt(1, self.ignored_rows)
output.setInt(2, self.total_rows)
output.next()
Multi-phase factory
A MultiPhaseTransformFunctionFactory ties together the individual functions as phases. The factory defines a TransformFunctionPhase for each function. Each phase defines createTransformFunction(), which calls the constructor for the corresponding TransformFunction, and getReturnType().
The first phase, LocalPhase, follows.
class MyAvgFactory(vertica_sdk.MultiPhaseTransformFunctionFactory):
""" Factory class """
class LocalPhase(vertica_sdk.TransformFunctionPhase):
""" Phase 1 """
def getReturnType(self, server_interface, input_types, output_types):
# sanity check
number_of_cols = input_types.getColumnCount()
if (number_of_cols != 1 or not input_types.getColumnType(0).isFloat()):
raise ValueError("Function only accepts one argument (FLOAT))")
output_types.addFloat("local_sum");
output_types.addInt("ignored_rows");
output_types.addInt("total_rows");
def createTransformFunction(cls, server_interface):
return LocalCalculation()
The second phase, GlobalPhase, does not check its inputs because the first phase already did. As with the first phase, createTransformFunction merely constructs and returns the corresponding TransformFunction.
class GlobalPhase(vertica_sdk.TransformFunctionPhase):
""" Phase 2 """
def getReturnType(self, server_interface, input_types, output_types):
output_types.addFloat("average");
output_types.addInt("ignored_rows");
output_types.addInt("total_rows");
def createTransformFunction(cls, server_interface):
return GlobalCalculation()
After defining the TransformFunctionPhase subclasses, the factory instantiates them and chains them together in getPhases().
ph0Instance = LocalPhase()
ph1Instance = GlobalPhase()
def getPhases(cls, server_interface):
cls.ph0Instance.setPrepass()
phases = [cls.ph0Instance, cls.ph1Instance]
return phases