Upgrade to Pro — share decks privately, control downloads, hide ads and more …

So you want to write a User-Defined Function fo...

So you want to write a User-Defined Function for Flink? @ Current 2024 - Austin, TX

Abstract

Apache Flink gained a lot of momentum during the past couple of years and is well-known across the data industry for its versatility. Developers and data engineers alike preferably pick higher-level programming abstractions such as Flink's Table API and Flink SQL whenever reasonably possible. And yet, specific requirements might cause the need for writing custom code and integrating with 3rd party libraries - Flink UDFs to the rescue!
This lightning talk shares a "first-timer's experience" with implementing a non-trivial scalar function for encrypting and decrypting data directly within Apache Flink jobs. We are looking at selected challenges and some of the "pain points" that came along with the respective Java implementation.
Join this session if you need to add custom processing logic to Apache Flink and plan to do so by writing your own user-defined functions. You'll walk away with a few practical and mostly generically applicable hints that should help mitigate some of the friction you might face.

Examples Repo: https://github.com/hpgrahsl/current24-udf-examples

Recording: pending...

Hans-Peter Grahsl

September 18, 2024
Tweet

More Decks by Hans-Peter Grahsl

Other Decks in Programming

Transcript

  1. Talk's Origin • my side project • Kryptonite for Kafka

    ! • bit.ly/k4k-repo @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  2. My Use Case • cryptographic functions for Flink SQL /

    Table API • ! encrypt / decrypt " individual fields SELECT ssn_enc FROM ( VALUES (ENCRYPT_UDF('123-45-6789')) ) as t(ssn_enc); tEnv.fromValues(/*...*/).select(call("DECRYPT_UDF", $("ssn_enc"), "").as("ssn")); • status: ! experimental " @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  3. Java Versions • JDK 8 ! DON'T " - deprecated

    since Flink 1.15 • JDK 11 - since Flink 1.10 • JDK 17 - since Flink 1.18 • JDK 21 - beta with Flink 1.19 @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  4. Java Versions • UDF compiled for newer runtime (e.g. JDK

    17) • Flink cluster uses older runtime (e.g. JDK 11) ⚡ java.lang.UnsupportedClassVersionError Exception in thread "..." java.lang.UnsupportedClassVersionError: com.g.h.f.talk.HelloUdf has been compiled by a more recent version of the Java Runtime (class file version 61.0), this version of the Java Runtime only recognizes class file versions up to 55.0 @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  5. Java Versions ! UDF works with JDK 11 ✅ #

    UDF fails with JDK 17 ❌ • JDK17+ enforces strong encapsulation • use --add-opens / --add-exports flags if applicable @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  6. Type Inference • Flink table ecosystem / SQL -> strongly

    typed • type mapping for UDF params & return values @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  7. Type Inference - Reflection • UDF input/output types automatically inferred

    public class EncryptUdf extends ScalarFunction { // -> input param type STRING // <- output type STRING public String eval(final String data) { // encryption and Base64 encoding of ciphertext here... } } @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  8. Type Inference - Annotations • @DataTypeHint, @FunctionHint, @ArgumentHint public class

    EncryptUdf extends ScalarFunction { // -> input param type ANY // <- output type STRING public String eval( @DataTypeHint(inputGroup = InputGroup.ANY) final Object data) { // encryption and Base64 encoding of ciphertext here... } } @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  9. Type Inference - Programmatic • define type inference in code

    • supports custom logic to derive I/O types • ! disables ! other inference mechanisms @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  10. Overloading • UDFs must provide evaluation methods • convention-based: •scalar

    function: 1+ public eval(...) method(s) •fails at runtime if violated -> ValidationException @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  11. Overloading Caveat • be careful with "extensive overloading" • a

    dozen+ overloadings can be problematic // ! just 14 basic overloadings caused a hang > 1 min for me public class MyOverloadedUdf extends ScalarFunction { public String eval(String myString) {/* ... */} //... 12 more overloadings public String eval(Double myDouble1, Double myDouble2) {/* ... */} } @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  12. Determinism • UDFs signal whether or not to be deterministic

    • "same UDF input -> same UDF output" • determinism is assumed by default @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  13. Consequence of Determinism Deterministic UDF with constant params? -> UDF

    might only be called once during planning @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  14. Non-Determinism ? • override isDeterministic method if necessary public class

    MyNonDeterministicUdf extends ScalarFunction { //NOTE: defaults to true @Override public boolean isDeterministic() { return false } //some non-deterministic eval method(s) here... } @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  15. Configuration more elborate UDFs might need: • initialization •override open(FunctionContext

    ctx) method • configuration •Flink job params •environment variables @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  16. ! TEST, TEST, ... and TEST again ! @hpgrahsl —

    decodable.co | Current 2024 | Sept. 18th | Austin, TX
  17. Unit Testing UDFs @ParameterizedTest(name = /* ... */) @CsvSource({ "developers,

    ES, Hola developers!", "Current'24, EN, Hello Current'24!" }) @DisplayName(/* ... */) void testHelloUdfWithTwoParams(String who, String lang, String result) { var udf = new HelloUdf(); assertEquals(result, udf.eval(who,lang)); } @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  18. Integration Testing UDFs • use mini cluster via JUnit5 Extension

    @RegisterExtension public static final MiniClusterExtension MINI_CLUSTER_RESOURCE = new MiniClusterExtension( new MiniClusterResourceConfiguration.Builder() .setNumberTaskManagers(1) .setNumberSlotsPerTaskManager(1) .build() ); @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  19. Integration Testing UDFs • prepare environment and register UDF @BeforeAll

    static void setUp(@InjectMiniCluster MiniCluster miniCluster) { ENV = StreamExecutionEnvironment.getExecutionEnvironment(); T_ENV = /* ... */ T_ENV.createTemporaryFunction("HELLO_UDF", HelloUdf.class); } @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  20. Integration Testing UDFs • create table data, execute query, and

    verify results @Test public void testHelloUdfWithValidInputs() throws Exception { T_ENV.createTemporaryView("input_table",T_ENV.fromValues(/* ... */)); var outputTable = T_ENV.sqlQuery(""" SELECT HELLO_UDF(who,lang) AS udf_output FROM input_table """); /* ... */ assertThat(results, containsInAnyOrder(/* ... */); } @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  21. E2E Testing UDFs • @Testcontainers + @Container with ComposeContainer @Testcontainers

    public class MyEndtoEndUdfsTest { @Container static ComposeContainer COMPOSE_CONTAINER = new ComposeContainer(new File("compose.yaml")) .withExposedService("jobmanager", 8081, Wait.forListeningPort()); /* ... */ } @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  22. Code Repo • basic UDF examples for the discussed aspects

    @hpgrahsl — decodable.co | Current 2024 | Sept. 18th | Austin, TX
  23. ! Let's chat @ the Decodable booth @hpgrahsl — decodable.co

    | Current 2024 | Sept. 18th | Austin, TX