Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Syncing from upstream apache/nifi #1

Merged
merged 11 commits into from
Sep 5, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
Expand All @@ -74,7 +75,6 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.nifi.processors.azure.storage.utils.AzureStorageUtils;

import static org.apache.nifi.util.StringUtils.isEmpty;

Expand Down Expand Up @@ -454,9 +454,9 @@ private void writeRecords(PartitionContext context, Iterable<EventData> messages

try (final OutputStream out = session.write(flowFile)) {
for (final EventData eventData : messages) {

try (final InputStream in = new ByteArrayInputStream(eventData.getBytes())) {
final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, logger);
final byte[] eventDataBytes = eventData.getBytes();
try (final InputStream in = new ByteArrayInputStream(eventDataBytes)) {
final RecordReader reader = readerFactory.createRecordReader(schemaRetrievalVariables, in, eventDataBytes.length, logger);

Record record;
while ((record = reader.nextRecord()) != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@

import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.anyMap;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -204,7 +205,7 @@ private void setupRecordReader(List<EventData> eventDataList, int throwException
final RecordReaderFactory readerFactory = mock(RecordReaderFactory.class);
processor.setReaderFactory(readerFactory);
final RecordReader reader = mock(RecordReader.class);
when(readerFactory.createRecordReader(anyMap(), any(), any())).thenReturn(reader);
when(readerFactory.createRecordReader(anyMap(), any(), anyLong(), any())).thenReturn(reader);
final List<Record> recordList = eventDataList.stream()
.map(eventData -> toRecord(new String(eventData.getBytes())))
.collect(Collectors.toList());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public Optional<Record> lookup(Map<String, Object> coordinates) throws LookupFai
recordReaderVariables.put(k, value.toString());
}
});
return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, getLogger()));
return new Tuple<>(null, readerFactory.createRecordReader(recordReaderVariables, in, -1, getLogger()));
} catch (Exception e) {
return new Tuple<>(e, null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-schema-registry-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record-serialization-service-api</artifactId>
</dependency>
<dependency>
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ public ArrayListRecordReader(final RecordSchema schema) {
}

@Override
public ArrayListReader createRecordReader(final Map<String, String> variables, final InputStream in, final ComponentLog logger) {
public ArrayListReader createRecordReader(final Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) {
return new ArrayListReader(records, schema);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,14 @@

package org.apache.nifi.serialization.record;

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
Expand All @@ -26,14 +34,6 @@
import java.util.List;
import java.util.Map;

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;

public class CommaSeparatedRecordReader extends AbstractControllerService implements RecordReaderFactory {
private int failAfterN;
private int recordCount = 0;
Expand All @@ -51,7 +51,7 @@ public void failAfter(final int failAfterN) {
}

@Override
public RecordReader createRecordReader(Map<String, String> variables, final InputStream in, final ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, final InputStream in, final long inputLength, final ComponentLog logger) throws IOException, SchemaNotFoundException {
final BufferedReader reader = new BufferedReader(new InputStreamReader(in));

final List<RecordField> fields = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,6 @@

package org.apache.nifi.serialization.record;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
Expand All @@ -33,6 +25,14 @@
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.SimpleRecordSchema;

import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

public class MockRecordParser extends AbstractControllerService implements RecordReaderFactory {
private final List<Object[]> records = new ArrayList<>();
private final List<RecordField> fields = new ArrayList<>();
Expand Down Expand Up @@ -67,7 +67,7 @@ public void addRecord(Object... values) {
}

@Override
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, ComponentLog logger) throws IOException, SchemaNotFoundException {
public RecordReader createRecordReader(Map<String, String> variables, InputStream in, long inputLength, ComponentLog logger) throws IOException, SchemaNotFoundException {
final Iterator<Object[]> itr = records.iterator();

return new RecordReader() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannel;
import org.apache.nifi.remote.io.socket.ssl.SSLSocketChannelInputStream;
Expand All @@ -30,6 +29,7 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.Collections;

/**
* Encapsulates an SSLSocketChannel and a RecordReader created for the given channel.
Expand All @@ -54,14 +54,14 @@ public SSLSocketChannelRecordReader(final SocketChannel socketChannel,
}

@Override
public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
public RecordReader createRecordReader(final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
if (recordReader != null) {
throw new IllegalStateException("Cannot create RecordReader because already created");
}

final InputStream socketIn = new SSLSocketChannelInputStream(sslSocketChannel);
final InputStream in = new BufferedInputStream(socketIn);
recordReader = readerFactory.createRecordReader(flowFile, in, logger);
recordReader = readerFactory.createRecordReader(Collections.emptyMap(), in, -1, logger);
return recordReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
Expand All @@ -32,18 +31,14 @@
public interface SocketChannelRecordReader extends Closeable {

/**
* Currently a RecordReader can only be created with a FlowFile. Since we won't have a FlowFile at the time
* a connection is accepted, this method will be used to lazily create the RecordReader later. Eventually this
* method should be removed and the reader should be passed in through the constructor.
* Lazily creates the RecordReader.
*
*
* @param flowFile the flow file we are creating the reader for
* @param logger the logger of the component creating the reader
* @return a RecordReader
*
* @throws IllegalStateException if create is called after a reader has already been created
*/
RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException;
RecordReader createRecordReader(ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException;

/**
* @return the RecordReader created by calling createRecordReader, or null if one has not been created yet
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
*/
package org.apache.nifi.record.listen;

import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.schema.access.SchemaNotFoundException;
import org.apache.nifi.serialization.MalformedRecordException;
Expand All @@ -27,6 +26,7 @@
import java.io.InputStream;
import java.net.InetAddress;
import java.nio.channels.SocketChannel;
import java.util.Collections;

/**
* Encapsulates a SocketChannel and a RecordReader created for the given channel.
Expand All @@ -48,13 +48,13 @@ public StandardSocketChannelRecordReader(final SocketChannel socketChannel,
}

@Override
public RecordReader createRecordReader(final FlowFile flowFile, final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
public RecordReader createRecordReader(final ComponentLog logger) throws IOException, MalformedRecordException, SchemaNotFoundException {
if (recordReader != null) {
throw new IllegalStateException("Cannot create RecordReader because already created");
}

final InputStream in = socketChannel.socket().getInputStream();
recordReader = readerFactory.createRecordReader(flowFile, in, logger);
recordReader = readerFactory.createRecordReader(Collections.emptyMap(), in, -1, logger);
return recordReader;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,7 +515,7 @@ public void swapFlowFilesIn(final String swapLocation, final List<FlowFileRecord
updateRepository(repoRecords, true);

synchronized (this.swapLocationSuffixes) {
this.swapLocationSuffixes.add(normalizeSwapLocation(swapLocation));
this.swapLocationSuffixes.remove(normalizeSwapLocation(swapLocation));
}

logger.info("Repository updated to reflect that {} FlowFiles were swapped in to {}", new Object[]{swapRecords.size(), queue});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4737,6 +4737,7 @@ private Set<FlowDifference> getModifications() {
.filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
.filter(diff -> !FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
.filter(diff -> !FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff, versionedGroup, flowManager))
.filter(diff -> !FlowDifferenceFilters.isScheduledStateNew(diff))
.collect(Collectors.toCollection(HashSet::new));

LOG.debug("There are {} differences between this Local Flow and the Versioned Flow: {}", differences.size(), differences);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import org.apache.nifi.controller.service.ControllerServiceNode;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.registry.flow.ComponentType;
import org.apache.nifi.registry.flow.ScheduledState;
import org.apache.nifi.registry.flow.VersionedComponent;
import org.apache.nifi.registry.flow.VersionedConnection;
import org.apache.nifi.registry.flow.VersionedFlowCoordinates;
Expand Down Expand Up @@ -163,6 +164,21 @@ private static boolean isNewPropertyWithDefaultValue(final FlowDifference fd, fi
return false;
}

public static boolean isScheduledStateNew(final FlowDifference fd) {
if (fd.getDifferenceType() != DifferenceType.SCHEDULED_STATE_CHANGED) {
return false;
}

// If Scheduled State transitions from null to ENABLED or ENABLED to null, consider it a "new" scheduled state.
if (fd.getValueA() == null && ScheduledState.ENABLED.equals(fd.getValueB())) {
return true;
}
if (fd.getValueB() == null && "ENABLED".equals(fd.getValueA())) {
return true;
}

return false;
}

public static boolean isNewRelationshipAutoTerminatedAndDefaulted(final FlowDifference fd, final VersionedProcessGroup processGroup, final FlowManager flowManager) {
if (fd.getDifferenceType() != DifferenceType.AUTO_TERMINATED_RELATIONSHIPS_CHANGED) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,10 @@ public void testSwapLocationsUpdatedOnRepoUpdate() throws IOException {
assertFalse(repo.isValidSwapLocationSuffix("swap123"));
repo.updateRepository(records);
assertTrue(repo.isValidSwapLocationSuffix("swap123"));

repo.swapFlowFilesIn("/tmp/swap123", Collections.singletonList(flowFileRecord), queue);
assertFalse(repo.isValidSwapLocationSuffix("swap123"));

repo.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1573,6 +1573,13 @@ ProcessGroupEntity updateProcessGroupContents(Revision revision, String groupId,
*/
Set<AffectedComponentEntity> getComponentsAffectedByParameterContextUpdate(ParameterContextDTO parameterContextDto);

/**
* Returns an up-to-date representation of the component that is referenced by the given affected component
* @param affectedComponent the affected component
* @return an up-to-date representation of the affected component
*/
AffectedComponentEntity getUpdatedAffectedComponentEntity(AffectedComponentEntity affectedComponent);

/**
* Returns a Set representing all Processors that reference any Parameters and that belong to the group with the given ID
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,46 @@ private Set<AffectedComponentEntity> getComponentsReferencingParameter(final Str
return dtoFactory.createAffectedComponentEntities(affectedComponents, revisionManager);
}

@Override
public AffectedComponentEntity getUpdatedAffectedComponentEntity(final AffectedComponentEntity affectedComponent) {
final AffectedComponentDTO dto = affectedComponent.getComponent();
if (dto == null) {
return affectedComponent;
}

final String groupId = affectedComponent.getProcessGroup().getId();
final ProcessGroup processGroup = processGroupDAO.getProcessGroup(groupId);

final String componentType = dto.getReferenceType();
if (AffectedComponentDTO.COMPONENT_TYPE_CONTROLLER_SERVICE.equals(componentType)) {
final ControllerServiceNode serviceNode = processGroup.getControllerService(dto.getId());
return dtoFactory.createAffectedComponentEntity(serviceNode, revisionManager);
} else if (AffectedComponentDTO.COMPONENT_TYPE_PROCESSOR.equals(componentType)) {
final ProcessorNode processorNode = processGroup.getProcessor(dto.getId());
return dtoFactory.createAffectedComponentEntity(processorNode, revisionManager);
} else if (AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT.equals(componentType)) {
final Port inputPort = processGroup.getInputPort(dto.getId());
final PortEntity portEntity = createInputPortEntity(inputPort);
return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_INPUT_PORT);
} else if (AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT.equals(componentType)) {
final Port outputPort = processGroup.getOutputPort(dto.getId());
final PortEntity portEntity = createOutputPortEntity(outputPort);
return dtoFactory.createAffectedComponentEntity(portEntity, AffectedComponentDTO.COMPONENT_TYPE_OUTPUT_PORT);
} else if (AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT.equals(componentType)) {
final RemoteGroupPort remoteGroupPort = processGroup.findRemoteGroupPort(dto.getId());
final RemoteProcessGroupEntity rpgEntity = createRemoteGroupEntity(remoteGroupPort.getRemoteProcessGroup(), NiFiUserUtils.getNiFiUser());
final RemoteProcessGroupPortDTO remotePortDto = dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
return dtoFactory.createAffectedComponentEntity(remotePortDto, AffectedComponentDTO.COMPONENT_TYPE_REMOTE_INPUT_PORT, rpgEntity);
} else if (AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT.equals(componentType)) {
final RemoteGroupPort remoteGroupPort = processGroup.findRemoteGroupPort(dto.getId());
final RemoteProcessGroupEntity rpgEntity = createRemoteGroupEntity(remoteGroupPort.getRemoteProcessGroup(), NiFiUserUtils.getNiFiUser());
final RemoteProcessGroupPortDTO remotePortDto = dtoFactory.createRemoteProcessGroupPortDto(remoteGroupPort);
return dtoFactory.createAffectedComponentEntity(remotePortDto, AffectedComponentDTO.COMPONENT_TYPE_REMOTE_OUTPUT_PORT, rpgEntity);
}

return affectedComponent;
}

@Override
public Set<AffectedComponentEntity> getComponentsAffectedByParameterContextUpdate(final ParameterContextDTO parameterContextDto) {
return getComponentsAffectedByParameterContextUpdate(parameterContextDto, true);
Expand Down Expand Up @@ -4529,6 +4569,7 @@ public Set<AffectedComponentEntity> getComponentsAffectedByVersionChange(final S
.filter(FlowDifferenceFilters.FILTER_IGNORABLE_VERSIONED_FLOW_COORDINATE_CHANGES)
.filter(diff -> !FlowDifferenceFilters.isNewPropertyWithDefaultValue(diff, flowManager))
.filter(diff -> !FlowDifferenceFilters.isNewRelationshipAutoTerminatedAndDefaulted(diff, proposedFlow.getContents(), flowManager))
.filter(diff -> !FlowDifferenceFilters.isScheduledStateNew(diff))
.map(difference -> {
final VersionedComponent localComponent = difference.getComponentA();

Expand Down
Loading