Flink ShutdownHookManager throws Trying to access closed classloader when destroying itself

I'm working on Flink application which has a kinesis consumer and ElasticSearch sink. My flink job works fine, I get the results I want, except at the end of application execution, when org.apache.hadoop.util ShutdownHookManager shutdownExecutor method is called I get this error:
Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
    at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
    at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2830)
    at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3104)
    at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:3063)
    at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:3036)
    at org.apache.hadoop.conf.Configuration.loadProps(Configuration.java:2914)
    at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2896)
    at org.apache.hadoop.conf.Configuration.get(Configuration.java:1246)
    at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1863)
    at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
    at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
    at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
    at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
    at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)

I managed to debug it down to shutdown executor trying to retrieve shutdown timeout:
- getShutdownTimeout(conf);
Which uses:
- new Configuration() default configuration
Exception is thrown on:
- conf.getTimeDuration(...)
When Configuration class tries to access non existing inner field of FlinkUserCodeClassLoader.

Does anyone know how to configure Flink application to have FlinkUserCodeClassLoader inner field set to anything but null on default Configuration object?
Screenshot_2023-05-04_at_14.43.15.png
Was this page helpful?