Flink ShutdownHookManager throws Trying to access closed classloader when destroying itself

I'm working on Flink application which has a kinesis consumer and ElasticSearch sink. My flink job works fine, I get the results I want, except at the end of application execution, when org.apache.hadoop.util ShutdownHookManager shutdownExecutor method is called I get this error:
Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2830)
at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3104)
at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:3063)
at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:3036)
at org.apache.hadoop.conf.Configuration.loadProps(Configuration.java:2914)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2896)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1246)
at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1863)
at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)
Exception in thread "Thread-9" java.lang.IllegalStateException: Trying to access closed classloader. Please check if you store classloaders directly or indirectly in static fields. If the stacktrace suggests that the leak occurs in a third party library and cannot be fixed immediately, you can disable this check with the configuration 'classloader.check-leaked-classloader'.
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.ensureInner(FlinkUserCodeClassLoaders.java:164)
at org.apache.flink.runtime.execution.librarycache.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.getResource(FlinkUserCodeClassLoaders.java:183)
at org.apache.hadoop.conf.Configuration.getResource(Configuration.java:2830)
at org.apache.hadoop.conf.Configuration.getStreamReader(Configuration.java:3104)
at org.apache.hadoop.conf.Configuration.loadResource(Configuration.java:3063)
at org.apache.hadoop.conf.Configuration.loadResources(Configuration.java:3036)
at org.apache.hadoop.conf.Configuration.loadProps(Configuration.java:2914)
at org.apache.hadoop.conf.Configuration.getProps(Configuration.java:2896)
at org.apache.hadoop.conf.Configuration.get(Configuration.java:1246)
at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1863)
at org.apache.hadoop.conf.Configuration.getTimeDuration(Configuration.java:1840)
at org.apache.hadoop.util.ShutdownHookManager.getShutdownTimeout(ShutdownHookManager.java:183)
at org.apache.hadoop.util.ShutdownHookManager.shutdownExecutor(ShutdownHookManager.java:145)
at org.apache.hadoop.util.ShutdownHookManager.access$300(ShutdownHookManager.java:65)
at org.apache.hadoop.util.ShutdownHookManager$1.run(ShutdownHookManager.java:102)
I managed to debug it down to shutdown executor trying to retrieve shutdown timeout: - getShutdownTimeout(conf); Which uses: - new Configuration() default configuration Exception is thrown on: - conf.getTimeDuration(...) When Configuration class tries to access non existing inner field of FlinkUserCodeClassLoader. Does anyone know how to configure Flink application to have FlinkUserCodeClassLoader inner field set to anything but null on default Configuration object?
No description
3 Replies
JavaBot
JavaBot3y ago
This post has been reserved for your question.
Hey @Lil Villa! Please use /close or the Close Post button above when you're finished. Please remember to follow the help guidelines. This post will be automatically closed after 300 minutes of inactivity.
TIP: Narrow down your issue to simple and precise questions to maximize the chance that others will reply in here. 💤 Post marked as dormant
This post has been inactive for over 300 minutes, thus, it has been archived. If your question was not answered yet, feel free to re-open this post or create a new one.
💤 Post marked as dormant
This post has been inactive for over 300 minutes, thus, it has been archived. If your question was not answered yet, feel free to re-open this post or create a new one.
💤 Post marked as dormant
This post has been inactive for over 300 minutes, thus, it has been archived. If your question was not answered yet, feel free to re-open this post or create a new one.
Lil Villa
Lil VillaOP3y ago
Resolved my problem by adding:
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader)
Thread.currentThread().setContextClassLoader(ClassLoader.getSystemClassLoader)
To my sink. I think it forces my job to use System class loader instead of dynamic flink class loader which somewhere and somewhy was being removed when shutdown executor tries to use it for its own shutdown.
JavaBot
JavaBot3y ago
Post Closed
This post has been closed by <@249473164248219648>.

Did you find this page helpful?