Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4635] Implemented the functions of file source connector #4650

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,10 @@

import lombok.Data;
import lombok.EqualsAndHashCode;
import lombok.extern.slf4j.Slf4j;

@Data
@Slf4j
@EqualsAndHashCode(callSuper = true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where do these two annotations work?

public class FileSourceConfig extends SourceConfig {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,9 @@ public class SourceConnectorConfig {
private String topic;

private long commitOffsetIntervalMs = 5 * 1000;

private String fileName;

private String filePath;

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,22 @@
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordOffset;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Calendar;
import java.util.List;
import java.util.Locale;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -36,6 +49,10 @@ public class FileSourceConnector implements Source {

private OffsetStorageReader offsetStorageReader;

private String filePath;
private String fileName;
private BufferedReader bufferedReader;

@Override
public Class<? extends Config> configClass() {
return FileSourceConfig.class;
Expand All @@ -45,25 +62,28 @@ public Class<? extends Config> configClass() {
public void init(Config config) throws Exception {
// init config for hdfs source connector
this.sourceConfig = (FileSourceConfig) config;

this.filePath = buildFilePath(((FileSourceConfig) config).getConnectorConfig().getFilePath());
this.fileName = buildFileName(((FileSourceConfig) config).getConnectorConfig().getFileName());
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
this.sourceConfig = (FileSourceConfig) sourceConnectorContext.getSourceConfig();
this.offsetStorageReader = sourceConnectorContext.getOffsetStorageReader();

}

@Override
public void start() throws Exception {

if (fileName == null || fileName.isEmpty() || filePath == null || filePath.isEmpty()) {
this.bufferedReader = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8));
} else {
this.bufferedReader = Files.newBufferedReader(Paths.get(filePath, fileName), StandardCharsets.UTF_8);
}
}

@Override
public void commit(ConnectRecord record) {

}

@Override
Expand All @@ -73,12 +93,48 @@ public String name() {

@Override
public void stop() {

try {
if (bufferedReader != null) {
bufferedReader.close();
}
} catch (Exception e) {
log.error("Error closing resources: {}", e.getMessage());
}
}

@Override
public List<ConnectRecord> poll() {
return null;
List<ConnectRecord> connectRecords = new ArrayList<>();
try {
String line;
while ((line = bufferedReader.readLine()) != null) {
ConnectRecord connectRecord = new ConnectRecord(new RecordPartition(), new RecordOffset(), System.currentTimeMillis(), line);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The offset recording seems redundant because the Source Connector you've implemented reads the entire contents of a file in a single pass and does not perform incremental reads.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reading the entire file may potentially impose significant memory pressure. It is necessary to set a memory limit or read the file in segments.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't see that this Connector implements memory limitation and fragmented reads through offset. If it is indeed achieved, could you explain how it was achieved? Thank you.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have addressed the implementation of fragmented reads through offset in the updated code. This was achieved by reading the file in chunks and recording the offset based on the last position index in the buffer array. Additionally, I ensured that the last position is updated in each iteration to maintain accurate tracking. Thank you for bringing this to my attention.

connectRecords.add(connectRecord);
}
} catch (IOException e) {
log.error("Error reading data from the file: {}", e.getMessage());
}
return connectRecords;
}

private String buildFilePath(String filePath) {
Calendar calendar = Calendar.getInstance(Locale.CHINA);
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy/MM/dd");
String formattedDate = dateFormat.format(calendar.getTime());
String filePath1 = filePath.replace("%s", formattedDate);
File path = new File(filePath1);
if (!path.exists()) {
if (!path.mkdirs()) {
log.error("make file dir {} error", filePath);
}
}
return filePath1;
}

private String buildFileName(String fileName) {
Calendar calendar = Calendar.getInstance(Locale.CHINA);
long currentTime = calendar.getTime().getTime();
String formattedDate = calendar.get(Calendar.HOUR_OF_DAY) + "-" + currentTime;
return fileName.replace("%s", formattedDate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,13 @@
# limitations under the License.
#

pubSubConfig:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was pubSubConfig configuration removed? With this configuration removed, this Source Connector will no longer work properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

meshAddress: 127.0.0.1:10000
subject: TopicTest
idc: FT
env: PRD
group: fileSource
appId: 5032
userName: fileSourceUser
passWord: filePassWord
connectorConfig:
connectorName: fileSource
nameserver: 127.0.0.1:9877
topic: TopicTest
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nameserver and topic are actually redundant for this Connector.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nameserver and topic are actually redundant for this Connector.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nameserver and topic are actually redundant for this Connector.

Updated.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topic is redundant, too.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pandaapo Sure, I will update the PR. Just for my knowledge, I wanted to know the usage of topic in the source connector. Also, in the ConnectorConfig, the topic was used only once, as in every other source-config.yml file for other source connectors. So, would removing topic be a good idea?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

topic is used as part of the data source definition in other Source Connectors, such as fetching data from a topic of Kafka, RocketMQ, or RabbitMQ. The current Connector fetches data from a file, and filePath and fileName already specify the data source.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated the PR.Can you please check now?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK. I am now starting the CI for this PR. I would appreciate it if you could test the functionality of this File Source Connector locally. I believe it is a very important job.

commitOffsetIntervalMs: 5000
fileName: TopicTest-%s
filePath: TopicTest/%s
Copy link
Member

@Pil0tXia Pil0tXia Jan 2, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. May you please provide a config document here later? That would be helpful.

Copy link
Member

@pandaapo pandaapo Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When users use this File Source Connector, they will have to place local files in directories structured like XXX/2024/01/03. I think it is not user-friendly. Why not directly use the filePath and fileName configured by the user here to create input stream?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

offsetStorageConfig:
offsetStorageType: nacos
offsetStorageAddr: 127.0.0.1:8848
Copy link
Member

@pandaapo pandaapo Jan 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Regarding these offset configurations, as I mentioned before, the File Source Connector you implemented reads all the content of the file at once, and offsets seem to have no effect. Could offsetStorageConfig and commitOffsetIntervalMs be removed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have implemented the commitOffsetIntervalMs in the updated code.

Expand Down