[plan] Extract ResourceCache and PythonResourceBridge from AgentPlan#548
[plan] Extract ResourceCache and PythonResourceBridge from AgentPlan#548weiqingy wants to merge 6 commits intoapache:mainfrom
Conversation
|
Both CI failures are unrelated to our changes:
|
| public class ResourceCache implements AutoCloseable { | ||
|
|
||
| private final Map<ResourceType, Map<String, ResourceProvider>> resourceProviders; | ||
| private final Map<ResourceType, Map<String, Resource>> cache = new HashMap<>(); |
There was a problem hiding this comment.
Should we use ConcurrentHashMap here? For task submit by ctx.durableExecuteAsync may read/write this hashmap parallel.
There was a problem hiding this comment.
Good question! This is intentional. After the refactoring, ResourceCache is created and owned by ActionExecutionOperator.open(), so it's scoped to a single operator subtask. In Flink's execution model, all access (processElement, open, close) runs on the same mailbox thread. durableExecuteAsync dispatches work to external threads, but resource resolution from the cache happens on the operator thread before dispatch. So HashMap is sufficient and avoids the overhead of ConcurrentHashMap. Let me know if this matches your understanding.
There was a problem hiding this comment.
but resource resolution from the cache happens on the operator thread before dispatch
I think the resource resolution may not always happens before dispatch.
Take the built in chat action as an example. In chat action, we submit chat task to external threads, and the chat task will call chat method of BaseChatModelSetup, which occurs in an asynchronous thread. In the chat method of chat model setup, it will resolves the correspond connection, prompt and tools, which I think may lead to concurrent access to the cache.
There was a problem hiding this comment.
You're right, thanks for pushing back on this. I traced the code path more carefully:
ChatModelAction → durableExecuteAsync(callable) → async pool thread runs chatModel.chat() → BaseChatModelSetup.chat() calls this.getResource.apply() to resolve connection, prompt, and tools → which hits ResourceCache.getResource().
So resource resolution does happen on async threads, not just the mailbox thread. I'll switch back to ConcurrentHashMap. Good catch!
|
Thanks for the review, @wenjin272! I’ve updated the PR to apply the same separation on the Python side as well - could you please take another look? |
wenjin272
left a comment
There was a problem hiding this comment.
LGTM. Could you take a look at your convenience @xintongsong ?
|
I checked the CI failures - both are LLM-dependent e2e tests and don’t appear to be caused by this PR. Test 1 (react_agent_test): The output 4444 = 2123 + 2321 proves our ResourceCache IS working correctly — the chat model was resolved, the add tool was resolved and called successfully. The LLM (qwen3:1.7b) simply stopped after one tool call instead of continuing to call multiply(4444, 312). This is LLM non-determinism. Test 2 (long_term_memory_test): This runs on the Flink remote runner, where there's exactly ONE FlinkRunnerContext with ONE ResourceCache. The behavior is identical to before. The failure is assert len(doc) == 1 after LLM-based compaction using qwen3:8b — if the model's summarization response is malformed, compaction produces incorrect output. We can re-run CI to confirm flakiness — if it fails again with different assertion values, that would further support LLM non-determinism. @wenjin272 do you have access to re-run the CI tests? It looks like admin rights are required. |
|
Hi, @weiqingy, sorry for I don't have access to re-run the CI. I acknowledge that the failing test is due to its own flakiness and not caused by this PR. |
|
Hi, @weiqingy, looks like the timeout of cross-language test may be related to this pr. Two attempts both failed. |
|
@weiqingy Thanks for working on this refactor. There're a few comments from my side. Please take a look.
|
| if (runnerContext != null) { | ||
| runnerContext.close(); | ||
| if (resourceCache != null) { | ||
| resourceCache.close(); |
There was a problem hiding this comment.
I investigated the cross-language test issues locally and found that resourceCache.close() must be called before pythonInterpreter.close(). After moving it to the very beginning of the close() method, the tests passed successfully.
The issue didn't appear earlier because I missed something while resolving a merge conflict, which caused runnerContext to be closed twice in the close() method. In reality, only the first close operation took effect.
There was a problem hiding this comment.
@wenjin272 Thanks for the review and for catching the close() ordering issue! Applied your fix — moved resourceCache.close() to the top of ActionExecutionOperator.close(), before pythonInterpreter.close(). Added a comment explaining the ordering constraint so it doesn't get accidentally re-ordered in the future.
Thanks for reviewing @xintongsong !
|
|
@xintongsong @wenjin272 The 2 CI failures are unrelated to our changes:
Notably, the cross-language test now passes (previously stuck/timing out) — confirming that the close() ordering fix works as @wenjin272 suggested. Could you please take another look at the PR? Thanks! |
There was a problem hiding this comment.
LGTM, please take a look at your convenience @xintongsong
I tend to disagree. I think the key question is which module does the responsibility of the class conceptually belongs to, not the dependency.
From that perspective, I think both As for the testing dependencies, first of all, it should be the production codes that affect/decide the testing codes, not the other way around. I briefly check the codes that referenced
That's much clearer. Thanks.
Sounds good. |
… to runtime Address reviewer feedback: ResourceCache and PythonMCPResourceDiscovery belong to "how to execute the agent" (runtime), not "what the agent looks like" (plan). - Move ResourceCache.java and PythonMCPResourceDiscovery.java to runtime module - Move Python resource_cache.py to runtime package - Extract ResourceCache-specific tests from AgentPlanTest into new ResourceCacheTest - Refactor 5 plan test files to use provider.provide() directly instead of ResourceCache - Update imports in runtime consumers (ActionExecutionOperator, RunnerContextImpl, etc.)
|
@xintongsong Addressed your review feedback:
Note: ResourceProvider itself stays in plan since AgentPlan.getResourceProviders() returns it — moving it would create a plan→runtime circular dependency. Let me know if you intended something different. |
|
The two failing checks are unrelated to this PR:
@wenjin272 @xintongsong Could you please help re-trigger the failed jobs and check whether the failures still reproduce? Thanks! |
|
@wenjin272 The cross-language test (ChatModelCrossLanguageTest) has been consistently failing with httpx.ReadTimeout on this PR. After digging into it, I found: On JDK 21, BaseChatModelSetup.chat() resolves resources (connection, prompt, tools) via getResource() inside the async thread (durableExecuteAsync). In cross-language scenarios, this ends up triggering Pemja calls into the Python interpreter from the async pool thread, which could lead to issues. It looks like your PR #571 addresses this by moving resource resolution into open() on the main thread, so chat() no longer calls getResource() from the async thread. WDYT? |
|
Hi, @weiqingy. Actually, the async execution is disabled for cross language resource. And without the pemja patch and #571, get cross language resource in async thread will cause pemja exception or jvm crash. So I think the |
@wenjin272 Thanks for the clarification - that makes sense. Given that the PR itself is unrelated to the flakiness, would it be reasonable to merge after re-triggering CI? I'll also open a follow-up issue to improve the test's resilience on slow runners (e.g., increasing the Ollama timeout or adding a retry). |
Linked issue: #547
Purpose of change
AgentPlan(624 lines) mixes plan definition, resource caching/resolution, Python bridge wiring, and serialization.This PR extracts two classes to separate concerns:
ResourceCache— lazy resource resolution, caching, and cleanup. Created by the operator inopen(), owned bythe operator lifecycle.
PythonResourceBridge— staticdiscoverPythonMCPResources()for Python MCP tool/prompt discovery. Calledduring operator init.
After extraction,
AgentPlanbecomes immutable after construction (~490 lines, down from 624). The removed publicmethods are
getResource(),close(), andsetPythonResourceAdapter().Tests
mvn test -pl plan— all plan module tests passmvn test -pl runtime— all runtime module tests pass./tools/lint.sh -c— formatting check passed./tools/ut.sh -j— full Java test suite passedAPI
Yes. Three public methods removed from
AgentPlan:getResource(String, ResourceType)— replaced byResourceCache.getResource()close()— replaced byResourceCache.close()setPythonResourceAdapter(PythonResourceAdapter)— replaced byPythonResourceBridge.discoverPythonMCPResources()Documentation
doc-neededdoc-not-neededdoc-included