Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade the version and documents of streamis 0.2.5 #61

Open
wants to merge 647 commits into
base: dev-0.2.5
Choose a base branch
from
Open
Changes from 1 commit
Commits
Show all changes
647 commits
Select commit Hold shift + click to select a range
f3b5bfb
Update images.
wushengyeyouya Mar 1, 2022
5b59cff
Update images.
wushengyeyouya Mar 1, 2022
121ef96
Changed the version of some dependencies since Dependabot alerts it.
wushengyeyouya Mar 1, 2022
3d55c1e
Add mysql connector jar.
green241 Mar 1, 2022
c0be1a0
Set upload zip file size to 500MB.(Sometimes the fat package built by…
green241 Mar 1, 2022
617eae6
Merge pull request #14 from green241/main
wushengyeyouya Mar 1, 2022
e4f7b29
Fix the problem that fetch workspace users failed.
wushengyeyouya Mar 2, 2022
804f25b
Merge remote-tracking branch 'origin/main' into main
wushengyeyouya Mar 2, 2022
147003c
Fix the problem that alert records will persistent failed by dao.
wushengyeyouya Mar 2, 2022
559837e
modify stoped to stopped
Mar 7, 2022
9c458d4
Fix the problem that method call in circle.
wushengyeyouya Apr 1, 2022
7ecd67f
Merge remote-tracking branch 'origin/main' into main
wushengyeyouya Apr 1, 2022
95d36a5
Add support of extra flink configs.
wushengyeyouya Apr 6, 2022
601d0cb
Consider the situation of no matched configuration in web page
Davidhua1996 Apr 17, 2022
f0e365c
Enable the developer to update/insert the configuration of users; Att…
Davidhua1996 Apr 17, 2022
d9fcda6
Adjust the streamis_ddl.sql
Davidhua1996 Apr 17, 2022
66ab96c
Adjust the streamis_ddl.sql to accept large value of field
Davidhua1996 Apr 18, 2022
37faddf
Redefine the project file regex; Filter and replace the specific para…
Davidhua1996 May 5, 2022
8e4e52e
Roll the version to "0.2.0"; Change the dependency version of linkis …
Davidhua1996 May 10, 2022
79cf5ab
Init the streamis AppConn
Davidhua1996 May 10, 2022
c25fdb0
feat: 【前端】流式应用的 Yarn 执行日志展示
May 11, 2022
f81ad9e
feat: modify detail tab i18n
May 11, 2022
2d7ffa8
add streamis Appconn
May 11, 2022
1ea2c62
Merge branch 'dev-0.2.0' of https://github.com/WeDataSphere/Streamis …
May 11, 2022
7b52f8b
feat: 初步写下前端逻辑,还剩进度条
May 12, 2022
e25ee11
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
May 12, 2022
54177a8
update streamis Appconn
May 12, 2022
db7a846
feat: 失败信息列表也加一下
May 13, 2022
0cf3cbf
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
May 13, 2022
f45b3a1
Adjust dependency and structure of the job launcher module and add Jo…
Davidhua1996 May 13, 2022
42b1156
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Davidhua1996 May 13, 2022
7fc81b2
Refresh the TODO list
Davidhua1996 May 13, 2022
58bbf27
Adjust the module of job launcher for Linkis.
Davidhua1996 May 15, 2022
a26e008
Note the progress of AbstractLinkisJobStateFetcher
Davidhua1996 May 16, 2022
7dead32
feat: fake some code
May 16, 2022
e4eada6
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
May 16, 2022
6b28a2a
fix: forget to change the test code
May 16, 2022
b13ddce
feat: temporarily create test.vue
May 16, 2022
bc09b6b
update streamis Appconn
May 16, 2022
66341a6
Merge remote-tracking branch 'origin/dev-0.2.0' into dev-0.2.0
May 16, 2022
874a34b
Complete the jobState entities and jobState Manager. Add FlinkTrigger…
Davidhua1996 May 17, 2022
94d7476
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Davidhua1996 May 17, 2022
2e4f65c
Get the Checkpoint file information
May 18, 2022
e3379f5
Get the Checkpoint file information, update exception handling
May 18, 2022
30c8c7a
request parater validation
May 19, 2022
98b5ed3
Streamis scheduler module and Reconstruct the TaskService class (50%)
Davidhua1996 May 19, 2022
4024d88
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Davidhua1996 May 19, 2022
7fec466
update Get the Checkpoint file information
May 20, 2022
b13216f
Resolve the JobStateResult.java compilation issues
May 20, 2022
3b840cd
Reconstruct the TaskService (100%); Bulk operation for job; Make the …
Davidhua1996 May 23, 2022
620c57c
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Davidhua1996 May 23, 2022
e70a06f
feat: complete the i18n config
May 23, 2022
0fc7733
feat: complete the logic of batch restart
May 23, 2022
b3a99ac
feat: no need for jobid
May 23, 2022
c14560f
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
May 23, 2022
4a1b837
feat: the function of job list ready
May 24, 2022
317ac59
feat: config还有点问题,先遗留
May 24, 2022
90cd350
fix: the child_def array empty problem
May 24, 2022
67a09b0
feat: improve the functions comment by DH
May 25, 2022
b104ec9
feat: improve style
May 25, 2022
964ec7b
fix: logic error
May 25, 2022
5d842d6
fix: bugs
May 25, 2022
a838c0c
fix: splice bug
May 25, 2022
3c1d44e
feat: hide the extra checkbox
May 25, 2022
2a12edd
feat: multi key warning
May 25, 2022
753ec0b
feat: add savepoint cgi and change the button load
May 25, 2022
3e0c9d3
fix: remained bugs
May 26, 2022
fae8f0f
feat: snapshot path & logs history problems
May 26, 2022
3a5a34c
fix: several warnings
May 26, 2022
df36b33
feat: savepoint modal
May 26, 2022
2759cb2
fix: endline logic problem
May 26, 2022
2e89881
Strategy to fetch the JobState from Linkis server.
Davidhua1996 May 26, 2022
44bb659
Fix the compilation problem in modules.
Davidhua1996 May 26, 2022
b9f68fa
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Davidhua1996 May 26, 2022
14b8e8b
feat: validate configs
May 26, 2022
c317559
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
May 26, 2022
cbf8e0f
feat: delete the extra logic
May 26, 2022
93197cd
fix: the cognition bugs
May 30, 2022
558fbf1
adjust appconn
May 30, 2022
7ed3412
init projectPrivilege
May 30, 2022
4a17a62
Merge remote-tracking branch 'origin/dev-0.2.0' into dev-0.2.0
May 30, 2022
dae49ba
Reconstruct the configuration module; Adjust the log4j2.xml
Davidhua1996 May 31, 2022
3615af9
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Davidhua1996 May 31, 2022
78d6b03
Project Privilege Update
May 31, 2022
78bd170
Merge branch 'dev-0.2.0' of https://github.com/WeDataSphere/Streamis …
May 31, 2022
390979f
AppConn Update
Jun 2, 2022
2e4d054
Adjust the linkis.properties and log4j2.xml.
Davidhua1996 Jun 6, 2022
95111bf
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Davidhua1996 Jun 6, 2022
ef06697
Store the linkis job id when the launched job has failed.
Davidhua1996 Jun 7, 2022
15d6bb0
AppConn related table update
Jun 8, 2022
acd945d
Flink the problem in JobRestfulApi; Enable to set flink application n…
Davidhua1996 Jun 8, 2022
1d92838
Upgrade the .gitignore file
Davidhua1996 Jun 8, 2022
742ea5d
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Davidhua1996 Jun 8, 2022
cacedcd
privilege manage update
Jun 8, 2022
0c0141e
Merge branch 'dev-0.2.0' of https://github.com/WeDataSphere/Streamis …
Jun 8, 2022
aa83b42
resolving injection conflicts
Jun 8, 2022
b877448
project privilege change
Jun 10, 2022
e3822fc
project privilege change
Jun 10, 2022
ba3b8cc
project privilege change
Jun 13, 2022
5cc3c17
appconn init DDL update
Jun 13, 2022
e6e0cda
Adjust the stream job service and the strategy of data authority.
Davidhua1996 Jun 13, 2022
a7fbd89
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Davidhua1996 Jun 13, 2022
c581adf
Resolve the conflict in Job service.
Davidhua1996 Jun 13, 2022
4e2f185
bug repair
Jun 13, 2022
26daf23
bug repair
Jun 13, 2022
239fe45
adjusting privilefe service
Jun 13, 2022
925f454
bug repair 277118
Jun 14, 2022
e45d65a
Modify the assembly configuration of streamis Appconn.
Davidhua1996 Jun 14, 2022
c6b7005
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Davidhua1996 Jun 14, 2022
303d32b
bug repair 277133
Jun 14, 2022
c32a610
Consider the situation of no matched configuration in web page
Davidhua1996 Apr 17, 2022
9d31291
Enable the developer to update/insert the configuration of users; Att…
Davidhua1996 Apr 17, 2022
1a8e49a
Adjust the streamis_ddl.sql
Davidhua1996 Apr 17, 2022
42ca2d5
Adjust the streamis_ddl.sql to accept large value of field
Davidhua1996 Apr 18, 2022
31666cb
Redefine the project file regex; Filter and replace the specific para…
Davidhua1996 May 5, 2022
e8b42bf
Roll the version to "0.2.0"; Change the dependency version of linkis …
Davidhua1996 May 10, 2022
ee26e7b
Init the streamis AppConn
Davidhua1996 May 10, 2022
ebc174b
add streamis Appconn
May 11, 2022
7316f17
update streamis Appconn
May 12, 2022
baddb04
Adjust dependency and structure of the job launcher module and add Jo…
Davidhua1996 May 13, 2022
020e871
Added switches for automatic restart of jobs and state recovery when …
Davidhua1996 Jun 16, 2022
38434d7
Adjust the notification message
Davidhua1996 Jun 16, 2022
30bb508
Avoid duplicate code
Davidhua1996 Jun 16, 2022
e6343da
Merge pull request #25 from WeDataSphere/dev-0.2.0-jobstate-interface
wushengyeyouya Jun 16, 2022
cb3ceb2
Reduce the duplicated dependencies of streamis-appconn
Davidhua1996 Jun 22, 2022
26a6464
update DeleteOperation and UpdateOperation Generic type
Jun 22, 2022
7fbf733
Revert "[Feature] JobState component in launcher module"
Davidhua1996 Jun 22, 2022
f66fd2e
adjust pom
Jun 22, 2022
5ef453c
Merge pull request #27 from WeBankFinTech/revert-25-dev-0.2.0-jobstat…
wushengyeyouya Jun 22, 2022
cbd43ae
Sync the latest ddl and dml sql for streamis
Davidhua1996 Jun 22, 2022
474b197
Remove the useless restful/mybatis configuration
Davidhua1996 Jun 23, 2022
e509121
Merge branch 'dev-0.2.0' of github.com:WeBankFinTech/Streamis into de…
Davidhua1996 Jun 23, 2022
9cf23b7
Merge remote-tracking branch 'wedata/dev-0.2.0-webank' into dev-0.2.0
Davidhua1996 Jun 23, 2022
d7f41ab
update init.sql ip port
Jun 23, 2022
c4e9799
Merge remote-tracking branch 'origin/dev-0.2.0-webank' into dev-0.2.0…
Jun 23, 2022
dc57a33
Merge branch 'dev-0.2.0-webank' into dev-0.2.0
Davidhua1996 Jun 23, 2022
21a8acf
Merge pull request #28 from WeDataSphere/dev-0.2.0
Davidhua1996 Jun 24, 2022
30a4351
Merge pull request #29 from WeBankFinTech/dev-0.2.0
wushengyeyouya Jun 24, 2022
85903aa
update init.sql IP PORT
Jun 24, 2022
a8c85db
StreamisAppConnDesignDocument
Jun 16, 2022
a0feda4
update web project version、 config.sh project version and project-ser…
Jun 25, 2022
ffe496c
Streamis安装文档
Jun 25, 2022
e2370de
Merge branch 'dev-0.2.0-webank' into dev-0.2.0
Jun 27, 2022
6a471fd
fix: the diy config problem
Jun 27, 2022
a1a5a59
Merge branch 'dev-0.2.0' of github.com:WeDataSphere/Streamis into dev…
Jun 27, 2022
5afe230
Merge pull request #30 from WeDataSphere/dev-0.2.0
Davidhua1996 Jun 27, 2022
7328b03
Remove StreamSQL development menu in DSS
Jun 27, 2022
4734eee
Optimize install.sh
Jun 27, 2022
e0ed4ce
Merge branch 'dev-0.2.0-webank' into dev-0.2.0
Jun 27, 2022
1c3535d
Optimize Streamis installation documentation
Jun 27, 2022
c1b5f85
Merge pull request #26 from WeDataSphere/dev-0.2.0-docs-appconn
Davidhua1996 Jun 28, 2022
45658a0
Solve the problem of no loss of Streamis Product Center
Jun 28, 2022
b37ad25
Merge branch 'dev-0.2.0-webank' into dev-0.2.0
Jun 28, 2022
efc73f2
update installation documentation
Jun 28, 2022
25cd526
Merge pull request #31 from WeBankFinTech/dev-0.2.0
Davidhua1996 Jun 29, 2022
037b0c5
Merge pull request #32 from WeDataSphere/dev-0.2.0
Davidhua1996 Jun 29, 2022
edf462a
Merge pull request #34 from WeBankFinTech/dev-0.2.0
Davidhua1996 Jun 29, 2022
75dc711
update insert dss_workspace_dictionary table
Jun 29, 2022
2db2847
Merge branch 'dev-0.2.0-webank' into dev-0.2.0
Jun 29, 2022
1e042ef
Merge pull request #35 from WeDataSphere/dev-0.2.0
Davidhua1996 Jun 29, 2022
bc7a52e
Merge pull request #36 from WeBankFinTech/dev-0.2.0
Davidhua1996 Jun 29, 2022
cd062ac
use document
Jun 30, 2022
d19be7c
Merge pull request #33 from WeDataSphere/dev-0.2.0-docs-appconn
Davidhua1996 Jul 1, 2022
f12b28a
Update English StreamisAppConnDesignDocument
Jul 1, 2022
3751197
add linkis1.1.2 support description
Jul 4, 2022
3aea2c5
Merge pull request #38 from WeDataSphere/dev-0.2.0-docs-appconn
Davidhua1996 Jul 4, 2022
81150dc
Enable the streamis to support different version of Linkis
Davidhua1996 Jul 4, 2022
72218d3
Merge branch 'dev-0.2.0' into dev-0.2.0-version-compatible
Davidhua1996 Jul 4, 2022
0efc59b
Change the format of linkis.properties
Davidhua1996 Jul 4, 2022
14a5aeb
Merge pull request #40 from WeDataSphere/dev-0.2.0-version-compatible
wushengyeyouya Jul 4, 2022
ea8d3d2
init upgrade document and update English pictures
Jul 4, 2022
106433b
update architecture picture
Jul 5, 2022
f70e82d
Merge pull request #41 from WeDataSphere/dev-0.2.0-docs-appconn
Davidhua1996 Jul 5, 2022
179fdf6
Merge pull request #42 from WeBankFinTech/dev-0.2.0
Davidhua1996 Jul 5, 2022
36b9ec7
Support the method to get engine conn resource information.
Davidhua1996 Oct 17, 2022
9c75477
Init the structure of log module.
Davidhua1996 Oct 17, 2022
0aa09cc
Add the new feature for adding task and stop task for existed jobs.
wushengyeyouya Oct 18, 2022
06c4962
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 18, 2022
0b44316
Use the "\u0001" instead of "\0x001" to act as blank placeholder.
Davidhua1996 Oct 18, 2022
7dea7f6
SendBuffer and BucketConfig.
Davidhua1996 Oct 18, 2022
343eb2b
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 19, 2022
1945775
Add the new feature for adding task and stop task for existed jobs.
wushengyeyouya Oct 19, 2022
41da9e5
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 19, 2022
6c6d50d
Complete the log collector (80%).
Davidhua1996 Oct 19, 2022
6814106
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 19, 2022
8b1bab8
fix the bug for adding task for existed jobs.
wushengyeyouya Oct 20, 2022
40588d3
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 20, 2022
77f616c
1. Add the ability for updating task for existed jobs.
wushengyeyouya Oct 20, 2022
5d274ab
RPC(Http) module classes in collector
Davidhua1996 Oct 20, 2022
223c590
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 20, 2022
1165614
1. Add the ability for updating task for existed jobs.
wushengyeyouya Oct 20, 2022
5af5289
1. Add the ability for updating task for existed jobs.
wushengyeyouya Oct 20, 2022
75a6ffb
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 20, 2022
e32b3e0
The strategy to retry the request and compact the buffer in log colle…
Davidhua1996 Oct 20, 2022
718227a
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 20, 2022
bd3ced3
Complete the log collector (95%)
Davidhua1996 Oct 20, 2022
7574928
Fix the problem in send buffer and consumer.
Davidhua1996 Oct 21, 2022
adc0124
Optimize the architecture of fetch Job details.
wushengyeyouya Oct 21, 2022
d354c5c
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 21, 2022
96f90d8
Add config autowired interface to set the log appender params.
Davidhua1996 Oct 21, 2022
cdf0468
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 21, 2022
0419c6d
fix the compilation bug.
wushengyeyouya Oct 21, 2022
9e648ba
fix the long to int parse bug.
wushengyeyouya Oct 21, 2022
ce152b8
Complete the log collector (100%)
Davidhua1996 Oct 22, 2022
ef5b0a3
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 22, 2022
0790ed9
Add Flink log collector and the transform in streamis which resolves …
Davidhua1996 Oct 22, 2022
ab64563
Complete stream log server to store the log events.
Davidhua1996 Oct 22, 2022
3979625
Fix the problem in job builder.
Davidhua1996 Oct 23, 2022
6eb95c2
Move the internal group config into JobConfKeyConstants.
Davidhua1996 Oct 23, 2022
1adf7d7
Move the internal group config into JobConfKeyConstants.
Davidhua1996 Oct 23, 2022
c07a5c2
Make the constraints in transforms be configurable.
Davidhua1996 Oct 23, 2022
0494533
Fix serialization and configuration problem.
Davidhua1996 Oct 23, 2022
977ba31
Optimize the update task and stop task logic.
wushengyeyouya Oct 24, 2022
2403de8
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 24, 2022
cf4acfa
Fix the problem in fetching user info when the flink application does…
Davidhua1996 Oct 24, 2022
8633652
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 24, 2022
05744cb
Add new strategy named ThresholdFilter
Davidhua1996 Oct 25, 2022
3427e13
Fix the problem when closing bucket.
Davidhua1996 Oct 25, 2022
641dc10
Split the log collector into log4j1 and log4j2; Avoid the jackson mod…
Davidhua1996 Oct 25, 2022
d9e9179
Add module to collect spark container log.
Davidhua1996 Oct 26, 2022
5f4de3a
Add the new requestBody parameter `projectName` for adding task and s…
wushengyeyouya Oct 26, 2022
f90542a
Add the new requestBody parameter `projectName` for adding task and s…
wushengyeyouya Oct 26, 2022
461232f
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 26, 2022
bbb0887
Complete the strategy in JsonTool.escapeStrValue().
Davidhua1996 Oct 26, 2022
68a9358
resolve 413 response status for update task.
wushengyeyouya Oct 29, 2022
111881e
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 29, 2022
c95af1b
resolve 413 response status for update task.
wushengyeyouya Oct 29, 2022
e474e29
Fix the in method restoreInvariants of AbstractRpcLogSender.
Davidhua1996 Oct 29, 2022
447440e
Merge branch 'dev-0.2.3-log-collector' of github.com:WeDataSphere/Str…
Davidhua1996 Oct 29, 2022
35bbdd0
resolve 413 response status for update task.
wushengyeyouya Oct 31, 2022
bf06bd1
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Oct 31, 2022
4cda151
adjust the relation path in pom.xml.
Davidhua1996 Nov 7, 2022
6a07287
Fix the problem in pom.xml of stream job log modules.
Davidhua1996 Nov 7, 2022
a0e59b7
resolve the LinkisJobInfo is null for adding task.
wushengyeyouya Nov 15, 2022
1d933a6
Merge remote-tracking branch 'origin/dev-0.2.3-log-collector' into de…
wushengyeyouya Nov 15, 2022
fe00dff
Merge pull request #54 from WeDataSphere/dev-0.2.3-log-collector
jefftlin Dec 15, 2022
95e6643
Upgrade the version of pom to 0.2.3 (#55)
jefftlin Dec 20, 2022
0e98bab
Use the "flink.yarn.ship-directories" instead of "flink.app.user.clas…
Davidhua1996 Dec 20, 2022
00c93e8
Merge pull request #57 from WeDataSphere/dev-0.2.3-bugfix
jefftlin Dec 20, 2022
ced5eff
Upgrade the version of pom to 0.2.4 (#58)
jefftlin Dec 20, 2022
1d33075
Optimization of log collector module (#60)
Davidhua1996 Dec 20, 2022
5ccce72
Upgrade the version of pom to 0.2.5
Dec 20, 2022
f5422bb
Add version upgrade sql
Feb 23, 2023
69848df
update upgrade sql
Feb 24, 2023
85ceac0
Update version of docs to 0.2.5
Feb 27, 2023
996a706
Update user usage document
Mar 1, 2023
894d49e
Update upgrade document;Adjust directory level
Mar 1, 2023
086cac3
Upgrade linkis version to 1.1.6-webank
Mar 3, 2023
48e2043
Upgrade snakeyaml version to 1.31
Mar 3, 2023
fe121ec
Adjust the order of alert user
Mar 21, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Add Flink log collector and the transform in streamis which resolves …
…the internal configuration
Davidhua1996 committed Oct 22, 2022
commit 0790ed94555dbdba209d76551f8316ed64cfe90b
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
@@ -46,6 +46,7 @@

<properties>
<linkis.version>1.1.3</linkis.version>
<junit.version>4.12</junit.version>
<dss.version>1.1.0</dss.version>
<streamis.version>0.2.0</streamis.version>
<scala.version>2.11.12</scala.version>
Original file line number Diff line number Diff line change
@@ -16,13 +16,80 @@
<maven.compiler.target>8</maven.compiler.target>
<!-- The flink version dependent-->
<flink.version>1.12.2</flink.version>
<log4j.version>2.17.1</log4j.version>
<slf4j.version>1.7.15</slf4j.version>
</properties>
<dependencies>
<dependency>
<groupId>com.webank.wedatasphere.streamis</groupId>
<artifactId>streamis-job-log-collector</artifactId>
<version>${streamis.version}</version>
</dependency>
<!-- flink basic dependencies-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-yarn_2.11</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<!--Junit-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
<!--log4j2-->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<id>assemble</id>
<goals>
<goal>single</goal>
</goals>
<!-- install -->
<phase>install</phase>
</execution>
</executions>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<appendAssemblyId>false</appendAssemblyId>
</configuration>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">

<id>package</id>

<formats>
<format>jar</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<dependencySets>
<dependencySet>
<outputDirectory>/</outputDirectory>
<unpack>true</unpack>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package com.webank.wedatasphere.streamis.jobmanager.log.collector.flink;

import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig;
import com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired;
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.logging.log4j.Level;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.filter.LevelMatchFilter;
import org.apache.logging.log4j.core.filter.RegexFilter;

import java.util.Enumeration;
import java.util.List;
import java.util.Properties;

import static com.webank.wedatasphere.streamis.jobmanager.log.collector.flink.FlinkStreamisConfigDefine.*;

/**
* Autoconfigure the streamis config inf Flink environment
*/
public class FlinkStreamisConfigAutowired implements StreamisConfigAutowired {

/**
* Flink configuration
*/
private Configuration configuration;

public FlinkStreamisConfigAutowired(){
// First to load configuration
this.configuration = loadConfiguration();
}
@Override
public StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Builder builder) throws Exception{
String applicationName = this.configuration.getString(YarnConfigOptions.APPLICATION_NAME);
if (StringUtils.isNotBlank(applicationName)){
builder.setAppName(applicationName);
}
String gateway = this.configuration.getString(LOG_GATEWAY_ADDRESS);
if (StringUtils.isNotBlank(gateway)){
if (gateway.endsWith("/")){
gateway = gateway.substring(0, gateway.length() - 1);
}
gateway += this.configuration.getString(LOG_COLLECT_PATH, "/");
builder.setRpcAddress(gateway);
}
List<String> filterStrategies = this.configuration.get(LOG_FILTER_STRATEGIES);
for(String filterStrategy : filterStrategies){
if ("LevelMatch".equals(filterStrategy)){
builder.withFilter(LevelMatchFilter.newBuilder()
.setLevel(Level.getLevel(this.configuration.getString(LOG_FILTER_LEVEL_MATCH))).build());
} else if ("RegexMatch".equals(filterStrategy)){
builder.withFilter(RegexFilter.createFilter( this.configuration.getString(LOG_FILTER_REGEX),
null, true, Filter.Result.ACCEPT, Filter.Result.DENY));
}
}
return builder.setRpcConnTimeout(this.configuration.getInteger(LOG_RPC_CONN_TIMEOUT))
.setRpcSocketTimeout(this.configuration.getInteger(LOG_RPC_SOCKET_TIMEOUT))
.setRpcSendRetryCnt(this.configuration.getInteger(LOG_RPC_SEND_RETRY_COUNT))
.setRpcServerRecoveryTimeInSec(this.configuration.getInteger(LOG_RPC_SERVER_RECOVERY_TIME))
.setRpcMaxDelayTimeInSec(this.configuration.getInteger(LOG_RPC_MAX_DELAY_TIME))
.setRpcAuthTokenCodeKey(this.configuration.getString(LOG_RPC_AUTH_TOKEN_CODE_KEY))
.setRpcAuthTokenUserKey(this.configuration.getString(LOG_RPC_AUTH_TOKEN_USER_KEY))
.setRpcAuthTokenCode(this.configuration.getString(LOG_RPC_AUTH_TOKEN_CODE))
.setRpcAuthTokenUser(this.configuration.getString(LOG_RPC_AUTH_TOKEN_USER))
.setRpcCacheSize(this.configuration.getInteger(LOG_RPC_CACHE_SIZE))
.setRpcCacheMaxConsumeThread(this.configuration.getInteger(LOG_PRC_CACHE_MAX_CONSUME_THREAD))
.setRpcBufferSize(this.configuration.getInteger(LOG_RPC_BUFFER_SIZE))
.setRpcBufferExpireTimeInSec(this.configuration.getInteger(LOG_RPC_BUFFER_EXPIRE_TIME)).build();
}

/**
* According to :
* String launchCommand =
* BootstrapTools.getTaskManagerShellCommand(
* flinkConfig,
* tmParams,
* ".",
* ApplicationConstants.LOG_DIR_EXPANSION_VAR,
* hasLogback,
* hasLog4j,
* hasKrb5,
* taskManagerMainClass,
* taskManagerDynamicProperties);
* the configuration directory of Flink yarn container is always ".",
* @return configuration
*/
private Configuration loadConfiguration(){
String configDir = System.getenv("FLINK_CONF_DIR");
if (null == configDir){
configDir = ".";
}
Properties properties = System.getProperties();
Enumeration<?> enumeration = properties.propertyNames();
Configuration dynamicConfiguration = new Configuration();
while(enumeration.hasMoreElements()){
String prop = String.valueOf(enumeration.nextElement());
dynamicConfiguration.setString(prop, properties.getProperty(prop));
}
return GlobalConfiguration.loadConfiguration(configDir, dynamicConfiguration);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package com.webank.wedatasphere.streamis.jobmanager.log.collector.flink;

import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;

import java.util.List;

/**
* Config definition
*/
public class FlinkStreamisConfigDefine {

/**
* Gateway address of log module for streamis
*/
public static final ConfigOption<String> LOG_GATEWAY_ADDRESS = ConfigOptions.key("stream.log.gateway.address")
.stringType().noDefaultValue().withDescription("The gateway address ex: http://127.0.0.1:8080");

/**
* Entrypoint path of collecting log
*/
public static final ConfigOption<String> LOG_COLLECT_PATH = ConfigOptions.key("stream.log.collect.path")
.stringType().defaultValue("/api/rest_j/v1/streamis/streamJobManager/log/collect/events").withDescription("The entrypoint path of collecting log");

/**
* Connection timeout(in milliseconds) in log RPC module
*/
public static final ConfigOption<Integer> LOG_RPC_CONN_TIMEOUT = ConfigOptions.key("stream.log.rpc.connect-timeout")
.intType().defaultValue(3000).withDescription("Connection timeout(ms) in log RPC module");

/**
* Socket timeout(in milliseconds) in log RPC module
*/
public static final ConfigOption<Integer> LOG_RPC_SOCKET_TIMEOUT = ConfigOptions.key("stream.log.rpc.socket-timeout")
.intType().defaultValue(15000).withDescription("Socket timeout(ms) in log RPC module");

/**
* Max retry count of sending message in log RPC module
*/
public static final ConfigOption<Integer> LOG_RPC_SEND_RETRY_COUNT = ConfigOptions.key("stream.log.rpc.send-retry-count")
.intType().defaultValue(3).withDescription("Max retry count of sending message in log RPC module");

/**
* Server recovery time(in seconds) in log RPC module
*/
public static final ConfigOption<Integer> LOG_RPC_SERVER_RECOVERY_TIME = ConfigOptions.key("stream.log.rpc.server-recovery-time-in-sec")
.intType().defaultValue(5).withDescription("Server recovery time(sec) in log RPC module");

/**
* Max delay time(in seconds) in log RPC module. if reach the limit, the message will be dropped
*/
public static final ConfigOption<Integer> LOG_RPC_MAX_DELAY_TIME = ConfigOptions.key("stream.log.rpc.max-delay-time")
.intType().defaultValue(60).withDescription("Max delay time(sec) in log RPC module");

/**
* Token code key in log RPC auth module
*/
public static final ConfigOption<String> LOG_RPC_AUTH_TOKEN_CODE_KEY = ConfigOptions.key("stream.log.rpc.auth.token-code-key")
.stringType().defaultValue("Token-Code").withDescription("Token code key in log RPC auth module");

/**
* Token user key in log RPC auth module
*/
public static final ConfigOption<String> LOG_RPC_AUTH_TOKEN_USER_KEY = ConfigOptions.key("stream.log.rpc.auth.token-user-key")
.stringType().defaultValue("Token-User").withDescription("Token user key in log RPC auth module");

/**
* Token code in log RPC auth module
*/
public static final ConfigOption<String> LOG_RPC_AUTH_TOKEN_CODE = ConfigOptions.key("stream.log.rpc.auth.token-code")
.stringType().defaultValue("STREAM-LOG").withDescription("Token code in log RPC auth module");

/**
* Token user in log RPC auth module
*/
public static final ConfigOption<String> LOG_RPC_AUTH_TOKEN_USER = ConfigOptions.key("stream.log.rpc.auth.token-user")
.stringType().defaultValue(System.getProperty("user.name")).withDescription("Token user in log RPC auth module");

/**
* Cache size in log RPC module
*/
public static final ConfigOption<Integer> LOG_RPC_CACHE_SIZE = ConfigOptions.key("stream.log.rpc.cache.size")
.intType().defaultValue(150).withDescription("Cache size in log RPC module");

/**
* Max cache consume threads in log RPC module
*/
public static final ConfigOption<Integer> LOG_PRC_CACHE_MAX_CONSUME_THREAD = ConfigOptions.key("stream.log.rpc.cache.max-consume-thread")
.intType().defaultValue(10).withDescription("Max cache consume threads in log RPC module");

/**
* Buffer size in log RPC module
*/
public static final ConfigOption<Integer> LOG_RPC_BUFFER_SIZE = ConfigOptions.key("stream.log.rpc.buffer.size")
.intType().defaultValue(50).withDescription("Buffer size in log RPC module");

/**
* Buffer expire time(sec) in log RPC module
*/
public static final ConfigOption<Integer> LOG_RPC_BUFFER_EXPIRE_TIME = ConfigOptions.key("stream.log.rpc.buffer.expire-time-in-sec")
.intType().defaultValue(2).withDescription("Buffer expire time (sec) in log RPC module");

/**
* Log filter strategy list
*/
public static final ConfigOption<List<String>> LOG_FILTER_STRATEGIES = ConfigOptions.key("stream.log.filter.strategies")
.stringType().asList().defaultValues("LevelMatch").withDescription("Log filter strategy list");

/**
* Level value of LevelMatch filter strategy
*/
public static final ConfigOption<String> LOG_FILTER_LEVEL_MATCH = ConfigOptions.key("stream.log.filter.level-match.level")
.stringType().defaultValue("ERROR").withDescription("Level value of LevelMatch filter strategy");

/**
* Regex value of RegexMatch filter strategy
*/
public static final ConfigOption<String> LOG_FILTER_REGEX = ConfigOptions.key("stream.log.filter.regex.value")
.stringType().defaultValue(".*").withDescription("Regex value of RegexMatch filter strategy");
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.webank.wedatasphere.streamis.jobmanager.log.collector.flink.FlinkStreamisConfigAutowired
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.webank.wedatasphere.streamis.jobmanager.log.collector.flink;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.GlobalConfiguration;
import org.junit.Test;

import java.io.File;
import java.util.Enumeration;
import java.util.Objects;
import java.util.Properties;

public class FlinkConfigurationLoadTest {
@Test
public void loadConfiguration() {
String configDir = Objects.requireNonNull(FlinkConfigurationLoadTest.class.getResource("/")).getFile();
Properties properties = System.getProperties();
Enumeration<?> enumeration = properties.propertyNames();
Configuration dynamicConfiguration = new Configuration();
while(enumeration.hasMoreElements()){
String prop = String.valueOf(enumeration.nextElement());
dynamicConfiguration.setString(prop, properties.getProperty(prop));
}
GlobalConfiguration.loadConfiguration(configDir, dynamicConfiguration);
}
}
Original file line number Diff line number Diff line change
@@ -24,7 +24,7 @@
<dependency>
<groupId>com.webank.wedatasphere.streamis</groupId>
<artifactId>streamis-job-log-common</artifactId>
<version>0.2.0</version>
<version>${streamis.version}</version>
</dependency>
<!--http client module-->
<dependency>
@@ -65,5 +65,12 @@
<version>${log4j.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -4,7 +4,9 @@
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.RpcLogSenderConfig;
import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig;
import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.StreamisRpcLogSender;
import com.webank.wedatasphere.streamis.jobmanager.log.collector.sender.http.Json;
import com.webank.wedatasphere.streamis.jobmanager.log.entities.StreamisLogEvent;
import com.webank.wedatasphere.streamis.jobmanager.plugin.StreamisConfigAutowired;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
import org.apache.logging.log4j.core.LogEvent;
@@ -27,9 +29,7 @@
*/
@Plugin(name = "StreamRpcLog", category = "Core", elementType = "appender", printObject = true)
public class StreamisRpcLogAppender extends AbstractAppender {

private static final String DEFAULT_APPENDER_NAME = "StreamRpcLog";

/**
* Appender config
*/
@@ -52,6 +52,8 @@ protected StreamisRpcLogAppender(String name, Filter filter,
this.appenderConfig = appenderConfig;
this.rpcLogSender = new StreamisRpcLogSender(this.appenderConfig.getApplicationName(),
this.appenderConfig.getSenderConfig());
this.rpcLogSender.setExceptionListener((subject, t, message) ->
LOGGER.error((null != subject? subject.getClass().getSimpleName() : "") + ": " + message, t));
this.logCache = this.rpcLogSender.getOrCreateLogCache();
Runtime.getRuntime().addShutdownHook(new Thread(this.rpcLogSender::close));
}
@@ -74,7 +76,7 @@ public static StreamisRpcLogAppender createAppender(@PluginAttribute("name") Str
@PluginAttribute("ignoreExceptions") boolean ignoreExceptions,
@PluginElement("Filter") final Filter filter,
@PluginElement("Layout") Layout<? extends Serializable> layout,
@PluginElement("RpcLogSender")RpcLogSenderConfig rpcLogSenderConfig){
@PluginElement("RpcLogSender")RpcLogSenderConfig rpcLogSenderConfig) throws Exception{
if (null == name || name.trim().equals("")){
name = DEFAULT_APPENDER_NAME;
}
@@ -97,6 +99,7 @@ public static StreamisRpcLogAppender createAppender(@PluginAttribute("name") Str
if (null == applicationName || applicationName.trim().equals("")){
throw new IllegalArgumentException("Application name cannot be empty");
}
LOGGER.info("StreamisRpcLogAppender: init with config {}", Json.toJson(logAppenderConfig, null));
return new StreamisRpcLogAppender(name, filter, layout, ignoreExceptions, Property.EMPTY_ARRAY, logAppenderConfig);
}

Original file line number Diff line number Diff line change
@@ -56,7 +56,7 @@ public Builder(String applicationName, Filter filter,
* @param applicationName application name
* @return builder
*/
StreamisLogAppenderConfig.Builder setAppName(String applicationName){
public StreamisLogAppenderConfig.Builder setAppName(String applicationName){
this.applicationName = applicationName;
return this;
}
@@ -66,7 +66,7 @@ StreamisLogAppenderConfig.Builder setAppName(String applicationName){
* @param filter filter
* @return builder
*/
StreamisLogAppenderConfig.Builder setFilter(Filter filter){
public StreamisLogAppenderConfig.Builder setFilter(Filter filter){
this.filters.clear();
this.filters.add(filter);
return this;
@@ -77,17 +77,27 @@ StreamisLogAppenderConfig.Builder setFilter(Filter filter){
* @param filter filter
* @return builder
*/
StreamisLogAppenderConfig.Builder withFilter(Filter filter){
public StreamisLogAppenderConfig.Builder withFilter(Filter filter){
filters.add(filter);
return this;
}

/**
* Rpc address
* @param address address
* @return builder
*/
public StreamisLogAppenderConfig.Builder setRpcAddress(String address){
this.rpcLogSenderConfig.setAddress(address);
return this;
}

/**
* Rpc connect timeout
* @param connectionTimeout connection timeout
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcConnTimeout(int connectionTimeout){
public StreamisLogAppenderConfig.Builder setRpcConnTimeout(int connectionTimeout){
this.rpcLogSenderConfig.setConnectionTimeout(connectionTimeout);
return this;
}
@@ -97,7 +107,7 @@ StreamisLogAppenderConfig.Builder setRpcConnTimeout(int connectionTimeout){
* @param socketTimeout socket timeout
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcSocketTimeout(int socketTimeout){
public StreamisLogAppenderConfig.Builder setRpcSocketTimeout(int socketTimeout){
this.rpcLogSenderConfig.setSocketTimeout(socketTimeout);
return this;
}
@@ -107,7 +117,7 @@ StreamisLogAppenderConfig.Builder setRpcSocketTimeout(int socketTimeout){
* @param sendRetryCnt send retry count
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcSendRetryCnt(int sendRetryCnt){
public StreamisLogAppenderConfig.Builder setRpcSendRetryCnt(int sendRetryCnt){
this.rpcLogSenderConfig.setSendRetryCnt(sendRetryCnt);
return this;
}
@@ -117,7 +127,7 @@ StreamisLogAppenderConfig.Builder setRpcSendRetryCnt(int sendRetryCnt){
* @param serverRecoveryTimeInSec server recovery time
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcServerRecoveryTimeInSec(int serverRecoveryTimeInSec){
public StreamisLogAppenderConfig.Builder setRpcServerRecoveryTimeInSec(int serverRecoveryTimeInSec){
this.rpcLogSenderConfig.setServerRecoveryTimeInSec(serverRecoveryTimeInSec);
return this;
}
@@ -127,7 +137,7 @@ StreamisLogAppenderConfig.Builder setRpcServerRecoveryTimeInSec(int serverRecove
* @param maxDelayTimeInSec max delay time in seconds
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcMaxDelayTimeInSec(int maxDelayTimeInSec){
public StreamisLogAppenderConfig.Builder setRpcMaxDelayTimeInSec(int maxDelayTimeInSec){
this.rpcLogSenderConfig.setMaxDelayTimeInSec(maxDelayTimeInSec);
return this;
}
@@ -137,7 +147,7 @@ StreamisLogAppenderConfig.Builder setRpcMaxDelayTimeInSec(int maxDelayTimeInSec)
* @param tokenCodeKey key of token code
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcAuthTokenCodeKey(String tokenCodeKey){
public StreamisLogAppenderConfig.Builder setRpcAuthTokenCodeKey(String tokenCodeKey){
this.rpcLogSenderConfig.getAuthConfig().setTokenCodeKey(tokenCodeKey);
return this;
}
@@ -147,7 +157,7 @@ StreamisLogAppenderConfig.Builder setRpcAuthTokenCodeKey(String tokenCodeKey){
* @param tokenUserKey key of token user
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcAuthTokenUserKey(String tokenUserKey){
public StreamisLogAppenderConfig.Builder setRpcAuthTokenUserKey(String tokenUserKey){
this.rpcLogSenderConfig.getAuthConfig().setTokenUserKey(tokenUserKey);
return this;
}
@@ -157,7 +167,7 @@ StreamisLogAppenderConfig.Builder setRpcAuthTokenUserKey(String tokenUserKey){
* @param tokenUser token user
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcAuthTokenUser(String tokenUser){
public StreamisLogAppenderConfig.Builder setRpcAuthTokenUser(String tokenUser){
this.rpcLogSenderConfig.getAuthConfig().setTokenUser(tokenUser);
return this;
}
@@ -167,7 +177,7 @@ StreamisLogAppenderConfig.Builder setRpcAuthTokenUser(String tokenUser){
* @param tokenCode token code
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcAuthTokenCode(String tokenCode){
public StreamisLogAppenderConfig.Builder setRpcAuthTokenCode(String tokenCode){
this.rpcLogSenderConfig.getAuthConfig().setTokenCode(tokenCode);
return this;
}
@@ -177,7 +187,7 @@ StreamisLogAppenderConfig.Builder setRpcAuthTokenCode(String tokenCode){
* @param cacheSize cache size
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcCacheSize(int cacheSize){
public StreamisLogAppenderConfig.Builder setRpcCacheSize(int cacheSize){
this.rpcLogSenderConfig.getCacheConfig().setSize(cacheSize);
return this;
}
@@ -187,7 +197,7 @@ StreamisLogAppenderConfig.Builder setRpcCacheSize(int cacheSize){
* @param maxConsumeThread max consume thread
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcCacheMaxConsumeThread(int maxConsumeThread){
public StreamisLogAppenderConfig.Builder setRpcCacheMaxConsumeThread(int maxConsumeThread){
this.rpcLogSenderConfig.getCacheConfig().setMaxConsumeThread(maxConsumeThread);
return this;
}
@@ -197,7 +207,7 @@ StreamisLogAppenderConfig.Builder setRpcCacheMaxConsumeThread(int maxConsumeThre
* @param bufferSize buffer size
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcBufferSize(int bufferSize){
public StreamisLogAppenderConfig.Builder setRpcBufferSize(int bufferSize){
this.rpcLogSenderConfig.getBufferConfig().setSize(bufferSize);
return this;
}
@@ -207,7 +217,7 @@ StreamisLogAppenderConfig.Builder setRpcBufferSize(int bufferSize){
* @param expireTimeInSec expire time
* @return builder
*/
StreamisLogAppenderConfig.Builder setRpcBufferExpireTimeInSec(int expireTimeInSec){
public StreamisLogAppenderConfig.Builder setRpcBufferExpireTimeInSec(int expireTimeInSec){
this.rpcLogSenderConfig.getBufferConfig().setExpireTimeInSec(expireTimeInSec);
return this;
}
Original file line number Diff line number Diff line change
@@ -87,6 +87,7 @@ public void run() {
if (this.isTerminated && e instanceof InterruptedException){
return;
} else {
e.printStackTrace();
System.err.println("SendLogCacheConsumer[" + Thread.currentThread().getName() + "] occurred exception [" + e.getLocalizedMessage() + "]");
// For the unknown exception clear the cache
sendBuffer.clear();
Original file line number Diff line number Diff line change
@@ -91,6 +91,24 @@ public static String toJson(Object obj, Class<?> model) {
return null;
}

public static String toJson(Object obj, Class<?> model, boolean beautify){
ObjectWriter writer = mapper.writer();
if(null != obj){
try{
if(null != model){
writer = writer.withView(model);
}
if(beautify){
return writer.withDefaultPrettyPrinter().writeValueAsString(obj);
}
return writer.writeValueAsString(obj);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
return null;
}

/**
* Convert object using serialization and deserialization
*
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.webank.wedatasphere.streamis.jobmanager.log.collector;
package com.webank.wedatasphere.streamis.jobmanager.plugin;

import com.webank.wedatasphere.streamis.jobmanager.log.collector.config.StreamisLogAppenderConfig;

@@ -11,5 +11,5 @@ public interface StreamisConfigAutowired {
* Log appender config
* @param builder builder
*/
StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Builder builder);
StreamisLogAppenderConfig logAppenderConfig(StreamisLogAppenderConfig.Builder builder) throws Exception;
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,28 @@
package com.webank.wedatasphere.streamis.jobmanager.log.collector;

import org.apache.logging.log4j.LogManager;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class StreamisLogAppenderTest {
private static final Logger LOG = LoggerFactory.getLogger(StreamisLogAppenderTest.class);
public static void main(String[] args) throws InterruptedException {
while(true){
for(int i = 0; i < 100; i ++){
LOG.info("Stream Log appender test");
@Test
public void appenderLog() throws InterruptedException {
org.apache.logging.log4j.core.Logger logger = (org.apache.logging.log4j.core.Logger) LogManager.getLogger("Test: Hello-world");
logger.info("");
int total = 1000;
int tps = 100;
long timer = System.currentTimeMillis() + 1000;
for(int i = 0; i < total; i ++){
if (i > 0 && i % tps == 0){
long sleep = timer - System.currentTimeMillis();
if (sleep > 0){
Thread.sleep(sleep);
}
timer = System.currentTimeMillis() + 1000;
}
Thread.sleep(1000);
LOG.info("Stream Log appender test, sequence id: " + i);
}
}
}
Original file line number Diff line number Diff line change
@@ -21,13 +21,19 @@ public StreamisLogEvents(String applicationName, StreamisLogEvent[] events){
this.appName = applicationName;
this.events = events;
long maxTime = -1;
for(StreamisLogEvent event : events){
long time = event.getLogTimeStamp();
if (time > maxTime){
maxTime = time;
StreamisLogEvent lastEvent = events[events.length - 1];
if (null == lastEvent) {
for (StreamisLogEvent event : events) {
long time = event.getLogTimeStamp();
if (time > maxTime) {
maxTime = time;
}
}
this.logTimeInMills = maxTime;
}else {
this.logTimeInMills = lastEvent.getLogTimeStamp();
}
this.logTimeInMills = maxTime;

}

@Override
Original file line number Diff line number Diff line change
@@ -15,7 +15,7 @@

package com.webank.wedatasphere.streamis.jobmanager.manager.conf

import org.apache.linkis.common.conf.{CommonVars, TimeType}
import org.apache.linkis.common.conf.{CommonVars, Configuration, TimeType}
import com.webank.wedatasphere.streamis.jobmanager.manager.exception.JobExecuteErrorException


@@ -29,6 +29,21 @@ object JobConf {

val STREAMIS_JOB_PARAM_BLANK_PLACEHOLDER: CommonVars[String] = CommonVars("wds.streamis.job.param.blank.placeholder", "\u0001")

/**
* Config group for streamis internal configuration
*/
val STREAMIS_INTERNAL_CONFIG_GROUP: CommonVars[String] = CommonVars("wds.streamis.job.internal.config.group", "wds.streamis.internal.params")

/**
* Gateway for stream job log module
*/
val STREAMIS_JOB_LOG_GATEWAY: CommonVars[String] = CommonVars("wds.streamis.job.log.gateway", Configuration.getGateWayURL())

/**
* Path for collecting stream job log
*/
val STREAMIS_JOB_LOG_COLLECT_PATH: CommonVars[String] = CommonVars("wds.streamis.job.log.collect.path", "/api/rest_j/v1/streamis/streamJobManager/log/collect/events")

val FLINK_JOB_STATUS_NOT_STARTED: CommonVars[Int] = CommonVars("wds.streamis.job.status.not-started", 0,"Not Started")

val FLINK_JOB_STATUS_COMPLETED: CommonVars[Int] = CommonVars("wds.streamis.job.status.completed", 1,"Completed")
Original file line number Diff line number Diff line change
@@ -18,12 +18,15 @@ package com.webank.wedatasphere.streamis.jobmanager.manager.transform.builder
import org.apache.linkis.common.conf.CommonVars
import org.apache.linkis.manager.label.entity.engine.RunType.RunType
import com.webank.wedatasphere.streamis.jobmanager.launcher.service.StreamJobConfService
import com.webank.wedatasphere.streamis.jobmanager.manager.conf.JobConf
import com.webank.wedatasphere.streamis.jobmanager.manager.dao.StreamJobMapper
import com.webank.wedatasphere.streamis.jobmanager.manager.entity.StreamJob
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.StreamisTransformJobBuilder
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.entity.{StreamisJobEngineConnImpl, StreamisTransformJob, StreamisTransformJobContent, StreamisTransformJobImpl}
import org.springframework.beans.factory.annotation.Autowired

import java.util
import scala.collection.JavaConverters.mapAsJavaMapConverter
/**
* Created by enjoyyin on 2021/9/22.
*/
@@ -39,7 +42,12 @@ abstract class AbstractStreamisTransformJobBuilder extends StreamisTransformJobB
override def build(streamJob: StreamJob): StreamisTransformJob = {
val transformJob = createStreamisTransformJob()
transformJob.setStreamJob(streamJob)
transformJob.setConfigMap(streamJobConfService.getJobConfig(streamJob.getId))
val jobConfig: util.Map[String, Any] = Option(streamJobConfService.getJobConfig(streamJob.getId))
.getOrElse(new util.HashMap[String, Any]())
// Put and overwrite internal group, users cannot customize the internal configuration
jobConfig.put(JobConf.STREAMIS_INTERNAL_CONFIG_GROUP.getValue, new util.HashMap[String, Any]())
internalLogConfig(jobConfig)
transformJob.setConfigMap(jobConfig)
// transformJob.setConfig(configurationService.getFullTree(streamJob.getId))
val streamJobVersions = streamJobMapper.getJobVersions(streamJob.getId)
// 无需判断streamJobVersions是否非空,因为TaskService已经判断了
@@ -48,6 +56,14 @@ abstract class AbstractStreamisTransformJobBuilder extends StreamisTransformJobB
transformJob
}

/**
* Log internal configuration
* @param internal internal config group
*/
private def internalLogConfig(internal: util.Map[String, Any]): Unit = {
internal.put(JobConf.STREAMIS_JOB_LOG_GATEWAY.key, JobConf.STREAMIS_JOB_LOG_GATEWAY.getValue)
internal.put(JobConf.STREAMIS_JOB_LOG_COLLECT_PATH.key, JobConf.STREAMIS_JOB_LOG_COLLECT_PATH.getValue)
}
}

abstract class AbstractFlinkStreamisTransformJobBuilder extends AbstractStreamisTransformJobBuilder{
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
package com.webank.wedatasphere.streamis.jobmanager.manager.transform.impl
import com.webank.wedatasphere.streamis.jobmanager.launcher.job.LaunchJob
import com.webank.wedatasphere.streamis.jobmanager.manager.conf.JobConf
import com.webank.wedatasphere.streamis.jobmanager.manager.transform.impl.FlinkInternalConfigTransform.INTERNAL_CONFIG_MAP
import org.apache.linkis.common.conf.CommonVars

import java.util
import scala.collection.JavaConverters.{mapAsJavaMapConverter, mapAsScalaMapConverter}

/**
* Flink internal config transform
*/
class FlinkInternalConfigTransform extends FlinkConfigTransform {

/**
* Config group name
*
* @return
*/
override protected def configGroup(): String = JobConf.STREAMIS_INTERNAL_CONFIG_GROUP.getValue

override protected def transform(internalConfig: util.Map[String, Any], job: LaunchJob): LaunchJob = {
transformConfig(internalConfig.asScala.map{
case (key, value) =>
(FlinkConfigTransform.FLINK_CONFIG_PREFIX + (INTERNAL_CONFIG_MAP.get(key) match {
case Some(mappingKey) => mappingKey
case _ => value
}), value)
}.asJava, job)
}
}

object FlinkInternalConfigTransform {

private val FLINK_LOG_GATEWAY_CONFIG_NAME = CommonVars("wds.streamis.flink.config.name.log-gateway", "stream.log.gateway.address").getValue

private val FLINK_LOG_COLLECT_PATH_CONFIG_NAME = CommonVars("wds.streamis.flink.config.name.log-collect-path", "stream.log.collect.path").getValue

val INTERNAL_CONFIG_MAP = Map(JobConf.STREAMIS_JOB_LOG_GATEWAY.key -> FLINK_LOG_GATEWAY_CONFIG_NAME,
JobConf.STREAMIS_JOB_LOG_COLLECT_PATH.key -> FLINK_LOG_COLLECT_PATH_CONFIG_NAME
)
}