Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #4635] Implemented the functions of file source connector #4650

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions eventmesh-connectors/eventmesh-connector-file/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

dependencies {
api project(":eventmesh-openconnect:eventmesh-openconnect-java")
testImplementation('org.junit.jupiter:junit-jupiter')
testImplementation('org.mockito:mockito-junit-jupiter')
compileOnly 'org.projectlombok:lombok'
annotationProcessor 'org.projectlombok:lombok'
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,5 @@ public class SourceConnectorConfig {

private String connectorName;

private String nameserver;

private String topic;

private long commitOffsetIntervalMs = 5 * 1000;
private String filePath;
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,30 @@
import org.apache.eventmesh.openconnect.api.connector.SourceConnectorContext;
import org.apache.eventmesh.openconnect.api.source.Source;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;
import org.apache.eventmesh.openconnect.offsetmgmt.api.storage.OffsetStorageReader;

import org.apache.eventmesh.openconnect.offsetmgmt.api.data.RecordPartition;

import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class FileSourceConnector implements Source {

private static final int BUFFER_SIZE = 8192;
private FileSourceConfig sourceConfig;

private OffsetStorageReader offsetStorageReader;
private String filePath;
private String fileName;
private BufferedReader bufferedReader;

@Override
public Class<? extends Config> configClass() {
Expand All @@ -45,20 +57,24 @@ public Class<? extends Config> configClass() {
public void init(Config config) throws Exception {
// init config for hdfs source connector
this.sourceConfig = (FileSourceConfig) config;

this.filePath = ((FileSourceConfig) config).getConnectorConfig().getFilePath();
this.fileName = getFileName(filePath);
}

@Override
public void init(ConnectorContext connectorContext) throws Exception {
SourceConnectorContext sourceConnectorContext = (SourceConnectorContext) connectorContext;
this.sourceConfig = (FileSourceConfig) sourceConnectorContext.getSourceConfig();
this.offsetStorageReader = sourceConnectorContext.getOffsetStorageReader();

}

@Override
public void start() throws Exception {

if (filePath == null || filePath.isEmpty()) {
this.bufferedReader = new BufferedReader(new InputStreamReader(System.in, StandardCharsets.UTF_8));
} else {
Path path = Paths.get(filePath);
this.bufferedReader = new BufferedReader(new InputStreamReader(Files.newInputStream(path), StandardCharsets.UTF_8), BUFFER_SIZE);
}
}

@Override
Expand All @@ -73,12 +89,42 @@ public String name() {

@Override
public void stop() {

try {
if (bufferedReader != null) {
bufferedReader.close();
}
} catch (Exception e) {
log.error("Error closing resources: {}", e.getMessage());
}
}

@Override
public List<ConnectRecord> poll() {
return null;
List<ConnectRecord> connectRecords = new ArrayList<>();
RecordPartition recordPartition = convertToRecordPartition(fileName);
try {
int bytesRead;
char[] buffer = new char[BUFFER_SIZE];
while ((bytesRead = bufferedReader.read(buffer)) != -1) {
String line = new String(buffer, 0, bytesRead);
long timeStamp = System.currentTimeMillis();
ConnectRecord connectRecord = new ConnectRecord(recordPartition, null, timeStamp, line);
connectRecords.add(connectRecord);
}
} catch (IOException e) {
log.error("Error reading data from the file: {}", e.getMessage());
}
return connectRecords;
}

public static RecordPartition convertToRecordPartition(String fileName) {
Map<String, String> map = new HashMap<>();
map.put("fileName", fileName);
return new RecordPartition(map);
}

private static String getFileName(String filePath) throws NullPointerException {
File file = new File(filePath);
return file.getName();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
#

pubSubConfig:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why was pubSubConfig configuration removed? With this configuration removed, this Source Connector will no longer work properly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Updated

meshAddress: 127.0.0.1:10000
subject: TopicTest
Expand All @@ -26,15 +25,5 @@ pubSubConfig:
passWord: filePassWord
connectorConfig:
connectorName: fileSource
nameserver: 127.0.0.1:9877
topic: TopicTest
commitOffsetIntervalMs: 5000
offsetStorageConfig:
offsetStorageType: nacos
offsetStorageAddr: 127.0.0.1:8848
extensions: {
#same with topic
dataId: TopicTest,
#same with group
group: fileSource
}
filePath: userFilePath

Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.eventmesh.connector.file;

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

import org.apache.eventmesh.connector.file.source.config.FileSourceConfig;
import org.apache.eventmesh.connector.file.source.config.SourceConnectorConfig;
import org.apache.eventmesh.connector.file.source.connector.FileSourceConnector;
import org.apache.eventmesh.openconnect.offsetmgmt.api.data.ConnectRecord;

import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import org.mockito.Mock;

class FileSourceConnectorTest {

private FileSourceConnector fileSourceConnector;
@Mock
private FileSourceConfig fileSourceConfig;

@Test
void testFileSourceConnector() throws Exception {
String directoryPath = "d/g/";
Path directory = Paths.get(directoryPath);
Files.createDirectories(directory);
Path newFilePath = directory.resolve("foo.txt");
Files.createFile(newFilePath);
fileSourceConfig = mock(FileSourceConfig.class);
SourceConnectorConfig connectorConfig = mock(SourceConnectorConfig.class);
when(fileSourceConfig.getConnectorConfig()).thenReturn(connectorConfig);
when(fileSourceConfig.getConnectorConfig().getFilePath()).thenReturn("d/g/foo.txt");
String filePath = fileSourceConfig.getConnectorConfig().getFilePath();
Path mockPath = Paths.get(filePath);
String content = "line1\nline2\nline3";
byte[] contentBytes = content.getBytes(StandardCharsets.UTF_8);
Files.write(mockPath, contentBytes);
fileSourceConnector = new FileSourceConnector();
fileSourceConnector.init(fileSourceConfig);
fileSourceConnector.start();
List<ConnectRecord> connectRecords = fileSourceConnector.poll();
fileSourceConnector.stop();
Files.delete(newFilePath);
Assertions.assertEquals(content, connectRecords.get(0).getData().toString());
}
}
Loading