Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add flink jar beta resources #1996

Merged
merged 2 commits into from
Jan 22, 2025
Merged

Conversation

byashimov
Copy link
Contributor

Add aiven_flink_jar_application, aiven_flink_jar_application_version and aiven_flink_jar_application_deployment BETA resources.

@byashimov byashimov force-pushed the byashimov-add-flink-jar-resources branch from 16d44e8 to ea3509d Compare January 15, 2025 14:18
@byashimov byashimov marked this pull request as ready for review January 15, 2025 18:01
@byashimov byashimov requested a review from a team as a code owner January 15, 2025 18:02
@byashimov byashimov force-pushed the byashimov-add-flink-jar-resources branch 4 times, most recently from ad00830 to 5106f09 Compare January 17, 2025 11:09
@byashimov
Copy link
Contributor Author

@byashimov byashimov force-pushed the byashimov-add-flink-jar-resources branch from 5106f09 to c8736df Compare January 17, 2025 11:14
rriski
rriski previously approved these changes Jan 21, 2025
Copy link
Contributor

@rriski rriski left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM, some small suggestions. Tested resource creation with:

flink example
terraform {
  required_providers {
    aiven = {
      source  = "aiven-dev/aiven"
      version = "0.0.0+dev"
    }
  }
}

provider "aiven" {
  api_token = var.aiven_api_token
}

# Your project
data "aiven_project" "example" {
  project = "aiven-sbox-tf-riski"
}


resource "aiven_flink" "example" {
  project      = data.aiven_project.example.project
  cloud_name   = "google-europe-north1"
  service_name = "flink-rriski"
  plan         = "business-4"

  flink_user_config {
    flink_version        = 1.19
    number_of_task_slots = 4
    custom_code          = true
  }
}

resource "aiven_flink_jar_application" "example" {
  project      = data.aiven_project.example.project
  service_name = aiven_flink.example.service_name
  name         = "example-app-jar"
}

resource "aiven_flink_jar_application_version" "example" {
  project        = data.aiven_project.example.project
  service_name   = aiven_flink.example.service_name
  application_id = aiven_flink_jar_application.example.application_id
  source         = "./example.jar"
}

resource "aiven_flink_jar_application_deployment" "example" {
  project        = data.aiven_project.example.project
  service_name   = aiven_flink.example.service_name
  application_id = aiven_flink_jar_application.example.application_id
  version_id     = aiven_flink_jar_application_version.example.application_version_id
}

aiven-flink-tf

The jar itself seems to result in an exception:

flink jar exception
Failed to run jar in Flink: Failed to start a job `c51628da-17c4-445f-a077-e97a91025c3d_f4b17813-5360-4b4d-91b6-35684cb813c1.jar`. Flink Response: {'errors': ['Internal server error.', "<Exception on server side:
java.util.concurrent.CompletionException: org.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'io.github.streamingwithflink.StreamingJob' was not found in the jar file.
	at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:192)
	at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.applyToConfiguration(JarHandlerUtils.java:149)
	at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:100)
	at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.handleRequest(JarRunHandler.java:57)
	at org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
	at org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:196)
	at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:88)
	at java.base/java.util.Optional.ifPresent(Optional.java:183)
	at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
	at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:85)
	at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:50)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
	at org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:55)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.MessageToMessageDecoder.channelRead(MessageToMessageDecoder.java:103)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:208)
	at org.apache.flink.runtime.rest.FileUploadHandler.channelRead0(FileUploadHandler.java:69)
	at org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
	at org.apache.flink.shaded.netty4.io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
	at org.apache.flink.shaded.netty4.io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at org.apache.flink.shaded.netty4.io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at org.apache.flink.shaded.netty4.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at org.apache.flink.shaded.netty4.io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at org.apache.flink.shaded.netty4.io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.apache.flink.client.program.ProgramInvocationException: The program's entry point class 'io.github.streamingwithflink.StreamingJob' was not found in the jar file.
	at org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:481)
	at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:153)
	at org.apache.flink.client.program.PackagedProgram.<init>(PackagedProgram.java:65)
	at org.apache.flink.client.program.PackagedProgram$Builder.build(PackagedProgram.java:691)
	at org.apache.flink.runtime.webmonitor.handlers.utils.JarHandlerUtils$JarHandlerContext.toPackagedProgram(JarHandlerUtils.java:190)
	... 50 more
Caused by: java.lang.ClassNotFoundException: io.github.streamingwithflink.StreamingJob
	at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:594)
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
	at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
	at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:527)
	at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:197)
	at java.base/java.lang.Class.forName0(Native Method)
	at java.base/java.lang.Class.forName(Class.java:398)
	at org.apache.flink.client.program.PackagedProgram.loadMainClass(PackagedProgram.java:479)
	... 54 more

End of exception on server side>"]}

Suggestion

Maybe we can use a different (very simple) jar that does not result in exceptions when ran on Aiven Flink and assert that the deployment itself succeeds too. I'd move flink_jar_application_version_test.go go to flink_jar_application_deployment_test.go and assert a succesfull deployment + application run there.

Documentation

I noticed that if custom_code in user confg is not enabled jar creation fails:

aiven_flink_jar_application.example: Creating...
╷
│ Error: [403 ServiceFlinkCreateJarApplication]: Custom code is not enabled for the service. Calling Jar Application related REST API endpoints is forbidden
│
│   with aiven_flink_jar_application.example,
│   on main.tf line 32, in resource "aiven_flink_jar_application" "example":
│   32: resource "aiven_flink_jar_application" "example" {

Toggling custom_code also forces recreation of the service.

FYI @staceysalamon-aiven @wojcik-dorota for input on potential docs changes.

Other

I noticed an unrelated issue: flink doesn't seem to support additional_disk_space:

Plan: 1 to add, 0 to change, 0 to destroy.
╷
│ Error: dynamic disk space is not configurable for this service
│
│   with aiven_flink.example,
│   on main.tf line 20, in resource "aiven_flink" "example":
│   20: resource "aiven_flink" "example" {

...but we show it in documentation https://registry.terraform.io/providers/aiven/aiven/latest/docs/resources/flink#additional_disk_space-1.

Another thing is that this leads to error:

flink_user_config {
  flink_version = 1.20
}

while this works:

flink_user_config {
  flink_version = 1.19
}

The error is:

│ Error: error creating a service: [400 ServiceCreate]: Invalid 'user_config': Invalid input for flink_version: '1.2' is not one of ['1.16', '1.19', '1.20']
│
│   with aiven_flink.example,
│   on main.tf line 20, in resource "aiven_flink" "example":
│   20: resource "aiven_flink" "example" {
│
╵

@wojcik-dorota
Copy link

wojcik-dorota commented Jan 21, 2025

Overall LGTM, some small suggestions. Tested resource creation with:

flink example
aiven-flink-tf

The jar itself seems to result in an exception:

flink jar exception

Suggestion

Maybe we can use a different (very simple) jar that does not result in exceptions when ran on Aiven Flink and assert that the deployment itself succeeds too. I'd move flink_jar_application_version_test.go go to flink_jar_application_deployment_test.go and assert a succesfull deployment + application run there.

Documentation

I noticed that if custom_code in user confg is not enabled jar creation fails:

aiven_flink_jar_application.example: Creating...
╷
│ Error: [403 ServiceFlinkCreateJarApplication]: Custom code is not enabled for the service. Calling Jar Application related REST API endpoints is forbidden
│
│   with aiven_flink_jar_application.example,
│   on main.tf line 32, in resource "aiven_flink_jar_application" "example":
│   32: resource "aiven_flink_jar_application" "example" {

Toggling custom_code also forces recreation of the service.

FYI @staceysalamon-aiven @wojcik-dorota for input on potential docs changes.

Other

I noticed an unrelated issue: flink doesn't seem to support additional_disk_space:

Plan: 1 to add, 0 to change, 0 to destroy.
╷
│ Error: dynamic disk space is not configurable for this service
│
│   with aiven_flink.example,
│   on main.tf line 20, in resource "aiven_flink" "example":
│   20: resource "aiven_flink" "example" {

...but we show it in documentation https://registry.terraform.io/providers/aiven/aiven/latest/docs/resources/flink#additional_disk_space-1.

Another thing is that this leads to error:

flink_user_config {
  flink_version = 1.20
}

while this works:

flink_user_config {
  flink_version = 1.19
}

The error is:

│ Error: error creating a service: [400 ServiceCreate]: Invalid 'user_config': Invalid input for flink_version: '1.2' is not one of ['1.16', '1.19', '1.20']
│
│   with aiven_flink.example,
│   on main.tf line 20, in resource "aiven_flink" "example":
│   20: resource "aiven_flink" "example" {
│
╵

Tagging @harshini-rangaswamy as a flink docs owner :-) re: potential docs changes

@harshini-rangaswamy
Copy link

harshini-rangaswamy commented Jan 21, 2025

Documentation

I noticed that if custom_code in user confg is not enabled jar creation fails:

aiven_flink_jar_application.example: Creating...
╷
│ Error: [403 ServiceFlinkCreateJarApplication]: Custom code is not enabled for the service. Calling Jar Application related REST API endpoints is forbidden
│
│   with aiven_flink_jar_application.example,
│   on main.tf line 32, in resource "aiven_flink_jar_application" "example":
│   32: resource "aiven_flink_jar_application" "example" {

Toggling custom_code also forces recreation of the service.

@rriski Is this behavior specific to Terraform? Via the Aiven Console, you can only enable Custom JARs during service creation. If this feature was not enabled at that point, you need to create a new service with Custom JARs enabled.
cc: @MichaelTansiniAiven

@byashimov byashimov force-pushed the byashimov-add-flink-jar-resources branch 2 times, most recently from 34bce25 to ea83cb8 Compare January 21, 2025 12:23
@byashimov
Copy link
Contributor Author

Hey @rriski, thanks for the review.

I noticed that if custom_code in user confg is not enabled jar creation fails:

Added this to every example, I hope it helps.

resource "aiven_flink" "example" {
  project                 = data.aiven_project.example.project
  service_name            = "example-flink-service"
  cloud_name              = "google-europe-west1"
  plan                    = "business-4"
  maintenance_window_dow  = "monday"
  maintenance_window_time = "04:00:00"
  
  flink_user_config {
    // Enables upload and deployment of Custom JARs
    custom_code = true
  }
}

I noticed an unrelated issue: flink doesn't seem to support additional_disk_space:

Nice catch. I believe it was initially added to the shared fields list. Then flink was added. Now there are cusotmizers, diff suppressors, autoscaler and so on. It’s nearly possible to solve this issue.

Another thing is that this leads to error:
flink_version = 1.20
Invalid input for flink_version: '1.2' is not one of

I believe, it sends 1.2 instead of 1.20. This does the job:

flink_version = "1.20"

flink jar exception

The jar file comes from the book. Not an expert in java though to fix this.

@byashimov byashimov force-pushed the byashimov-add-flink-jar-resources branch 2 times, most recently from bbb5ad7 to c51f7a4 Compare January 22, 2025 09:58
@byashimov byashimov force-pushed the byashimov-add-flink-jar-resources branch from c51f7a4 to 5c77706 Compare January 22, 2025 10:58
@byashimov
Copy link
Contributor Author

@byashimov byashimov merged commit 22694c8 into main Jan 22, 2025
10 checks passed
@byashimov byashimov deleted the byashimov-add-flink-jar-resources branch January 22, 2025 13:07
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants