Skip to content

Commit

Permalink
Fix side input stores to use logged store directory
Browse files Browse the repository at this point in the history
Side input stores are non-changelog stores that still needs to use logged stored directory to guarantee durability.

Author: bharathkk <[email protected]>

Reviewers: Prateek Maheshwari <[email protected]>

Closes apache#701 from bharathkk/side-input-fix
  • Loading branch information
mynameborat authored and prateekm committed Oct 9, 2018
1 parent e312bb5 commit 07199cb
Show file tree
Hide file tree
Showing 2 changed files with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -553,7 +553,9 @@ object SamzaContainer extends Logging {
case _ => null
}

val storeDir = if (changeLogSystemStreamPartition != null) {
// We use the logged storage base directory for change logged and side input stores since side input stores
// dont have changelog configured.
val storeDir = if (changeLogSystemStreamPartition != null || sideInputStoresToSystemStreams.contains(storeName)) {
TaskStorageManager.getStorePartitionDir(loggedStorageBaseDir, storeName, taskName)
} else {
TaskStorageManager.getStorePartitionDir(nonLoggedStorageBaseDir, storeName, taskName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import com.google.common.collect.ImmutableList;

import java.nio.file.FileSystems;
import java.time.Duration;
import java.util.Arrays;
import java.util.HashMap;
Expand All @@ -30,6 +31,7 @@
import org.apache.samza.SamzaException;
import org.apache.samza.application.StreamApplicationDescriptor;
import org.apache.samza.application.StreamApplication;
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.operators.KV;
Expand Down Expand Up @@ -82,6 +84,11 @@ private void runTest(String systemName, StreamApplication app, List<PageView> pa
Map<String, String> configs = new HashMap<>();
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PAGEVIEW_STREAM), systemName);
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), PROFILE_STREAM), systemName);
configs.put(JobConfig.JOB_NON_LOGGED_STORE_BASE_DIR(),
FileSystems.getDefault().getPath("non-logged").toAbsolutePath().toString());
// SideInput Tables needs this to be configured for persisting data
configs.put(JobConfig.JOB_LOGGED_STORE_BASE_DIR(),
FileSystems.getDefault().getPath("logged").toAbsolutePath().toString());
configs.put(String.format(StreamConfig.SYSTEM_FOR_STREAM_ID(), ENRICHED_PAGEVIEW_STREAM), systemName);

InMemorySystemDescriptor isd = new InMemorySystemDescriptor(systemName);
Expand Down

0 comments on commit 07199cb

Please sign in to comment.