Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,16 @@
<artifactId>targeting-engine</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.split.client</groupId>
<artifactId>parsing-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.split.client</groupId>
<artifactId>segment-commons</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>io.split.client</groupId>
<artifactId>pluggable-storage</artifactId>
Expand Down
18 changes: 16 additions & 2 deletions client/src/main/java/io/split/client/SplitFactoryImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,10 @@
import io.split.engine.experiments.SplitParser;
import io.split.engine.experiments.SplitSynchronizationTask;
import io.split.engine.experiments.RuleBasedSegmentParser;
import io.split.engine.segments.ExecutorFactory;
import io.split.engine.segments.SegmentChangeFetcher;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.engine.segments.TelemetryListener;
import io.split.integrations.IntegrationsConfig;
import io.split.service.SplitHttpClientImpl;
import io.split.service.SplitHttpClient;
Expand All @@ -85,6 +87,7 @@
import io.split.storages.pluggable.adapters.UserCustomRuleBasedSegmentAdapterConsumer;
import io.split.storages.pluggable.domain.UserStorageWrapper;
import io.split.storages.pluggable.synchronizer.TelemetryConsumerSubmitter;
import io.split.telemetry.domain.enums.LastSynchronizationRecordsEnum;
import io.split.telemetry.storage.InMemoryTelemetryStorage;
import io.split.telemetry.storage.NoopTelemetryStorage;
import io.split.telemetry.storage.TelemetryStorage;
Expand Down Expand Up @@ -129,6 +132,7 @@
import java.util.List;
import java.util.ArrayList;

import io.split.client.utils.SplitExecutorFactory;
import static io.split.client.utils.SplitExecutorFactory.buildExecutorService;

public class SplitFactoryImpl implements SplitFactory {
Expand Down Expand Up @@ -427,12 +431,17 @@ protected SplitFactoryImpl(SplitClientConfig config) {
segmentChangeFetcher = new LocalhostSegmentChangeFetcher(config.segmentDirectory());
}

TelemetryListener segmentTelemetryListener =
t -> _telemetryStorageProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, t);
ExecutorFactory segmentExecutorFactory =
(tf, name, n) -> SplitExecutorFactory.buildScheduledExecutorService(tf, name, n);
_segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher,
config.segmentsRefreshRate(),
config.numThreadsForSegmentFetch(),
segmentCache,
_telemetryStorageProducer,
segmentTelemetryListener,
_splitCache,
segmentExecutorFactory,
config.getThreadFactory(),
ruleBasedSegmentCache);

Expand Down Expand Up @@ -696,12 +705,17 @@ private SegmentSynchronizationTaskImp buildSegments(SplitClientConfig config,
SegmentChangeFetcher segmentChangeFetcher = HttpSegmentChangeFetcher.create(_splitHttpClient, _rootTarget,
_telemetryStorageProducer);

TelemetryListener segTelemetryListener =
t -> _telemetryStorageProducer.recordSuccessfulSync(LastSynchronizationRecordsEnum.SEGMENTS, t);
ExecutorFactory segExecutorFactory =
(tf, name, n) -> SplitExecutorFactory.buildScheduledExecutorService(tf, name, n);
return new SegmentSynchronizationTaskImp(segmentChangeFetcher,
config.segmentsRefreshRate(),
config.numThreadsForSegmentFetch(),
segmentCacheProducer,
_telemetryStorageProducer,
segTelemetryListener,
splitCacheConsumer,
segExecutorFactory,
config.getThreadFactory(),
ruleBasedSegmentCache);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package io.split.storages;

import java.util.Set;

public interface SplitCacheCommons {
public interface SplitCacheCommons extends SegmentsProvider {
long getChangeNumber();
Set<String> getSegments();
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import io.split.client.utils.FileInputStreamProvider;
import io.split.client.utils.InputStreamProvider;
import io.split.engine.experiments.*;
import io.split.engine.segments.ExecutorFactory;
import io.split.engine.segments.SegmentChangeFetcher;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.engine.segments.TelemetryListener;
import io.split.storages.*;
import io.split.storages.memory.InMemoryCacheImp;
import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp;
Expand All @@ -24,6 +26,8 @@
public class LocalhostSynchronizerTest {

private static final TelemetryStorage TELEMETRY_STORAGE_NOOP = Mockito.mock(NoopTelemetryStorage.class);
private static final TelemetryListener TELEMETRY_LISTENER = t -> {};
private static final ExecutorFactory EXECUTOR_FACTORY = (tf, name, n) -> java.util.concurrent.Executors.newScheduledThreadPool(n);
private static final FlagSetsFilter FLAG_SETS_FILTER = new FlagSetsFilterImpl(new HashSet<>());

@Test
Expand All @@ -45,7 +49,7 @@ public void testSyncAll(){
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();

SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);
TELEMETRY_LISTENER, splitCacheProducer, EXECUTOR_FACTORY, null, ruleBasedSegmentCache);
SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null);

LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, false);
Expand All @@ -72,7 +76,7 @@ public void testPeriodicFetching() throws InterruptedException {
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();

SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);
TELEMETRY_LISTENER, splitCacheProducer, EXECUTOR_FACTORY, null, ruleBasedSegmentCache);

SplitTasks splitTasks = SplitTasks.build(splitSynchronizationTask, segmentSynchronizationTaskImp, null, null, null, null);
LocalhostSynchronizer localhostSynchronizer = new LocalhostSynchronizer(splitTasks, splitFetcher, true);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,10 @@
import io.split.client.impressions.UniqueKeysTracker;
import io.split.client.interceptors.FlagSetsFilter;
import io.split.client.interceptors.FlagSetsFilterImpl;
import io.split.engine.segments.ExecutorFactory;
import io.split.engine.segments.SegmentChangeFetcher;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.engine.segments.TelemetryListener;
import io.split.storages.*;
import io.split.storages.memory.InMemoryCacheImp;
import io.split.engine.experiments.FetchResult;
Expand Down Expand Up @@ -81,9 +83,11 @@ public void syncAll() throws InterruptedException {

@Test
public void testSyncAllSegments() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
ExecutorFactory executorFactory = (tf, name, n) -> java.util.concurrent.Executors.newScheduledThreadPool(n);
TelemetryListener telemetryListener = t -> {};
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(Mockito.mock(SegmentChangeFetcher.class),
20L, 1, _segmentCacheProducer, Mockito.mock(TelemetryRuntimeProducer.class),
Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class));
20L, 1, _segmentCacheProducer, telemetryListener,
Mockito.mock(SplitCacheConsumer.class), executorFactory, null, Mockito.mock(RuleBasedSegmentCache.class));
Field synchronizerSegmentFetcher = SynchronizerImp.class.getDeclaredField("_segmentSynchronizationTaskImp");
synchronizerSegmentFetcher.setAccessible(true);
Field modifiersField = Field.class.getDeclaredField("modifiers");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@
import io.split.engine.common.FetchOptions;
import io.split.rules.matchers.AllKeysMatcher;
import io.split.rules.matchers.CombiningMatcher;
import io.split.engine.segments.ExecutorFactory;
import io.split.engine.segments.SegmentChangeFetcher;
import io.split.engine.segments.SegmentSynchronizationTask;
import io.split.engine.segments.SegmentSynchronizationTaskImp;
import io.split.engine.segments.TelemetryListener;
import io.split.grammar.Treatments;
import io.split.telemetry.storage.InMemoryTelemetryStorage;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
Expand Down Expand Up @@ -52,6 +54,8 @@
public class SplitFetcherTest {
private static final Logger _log = LoggerFactory.getLogger(SplitFetcherTest.class);
private static final TelemetryStorage TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
private static final TelemetryListener TELEMETRY_LISTENER = t -> {};
private static final ExecutorFactory EXECUTOR_FACTORY = (tf, name, n) -> java.util.concurrent.Executors.newScheduledThreadPool(n);
private static final FlagSetsFilter FLAG_SETS_FILTER = new FlagSetsFilterImpl(new HashSet<>());

@Test
Expand Down Expand Up @@ -159,7 +163,7 @@ public void whenParserFailsWeRemoveTheExperiment() throws InterruptedException {
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();

SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null, ruleBasedSegmentCache);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1, 10, segmentCache, TELEMETRY_LISTENER, cache, EXECUTOR_FACTORY, null, ruleBasedSegmentCache);
segmentSynchronizationTask.start();
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
ruleBasedSegmentParser, ruleBasedSegmentCache);
Expand All @@ -184,7 +188,7 @@ public void ifThereIsAProblemTalkingToSplitChangeCountDownLatchIsNotDecremented(
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();

SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, TELEMETRY_STORAGE, cache, null, ruleBasedSegmentCache);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1, 10, segmentCache, TELEMETRY_LISTENER, cache, EXECUTOR_FACTORY, null, ruleBasedSegmentCache);
segmentSynchronizationTask.start();
SplitFetcherImp fetcher = new SplitFetcherImp(splitChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
ruleBasedSegmentParser, ruleBasedSegmentCache);
Expand Down Expand Up @@ -286,7 +290,7 @@ public void worksWithUserDefinedSegments() throws Exception {
SegmentChangeFetcher segmentChangeFetcher = mock(SegmentChangeFetcher.class);
SegmentChange segmentChange = getSegmentChange(0L, 0L, segmentName);
when(segmentChangeFetcher.fetch(anyString(), anyLong(), any())).thenReturn(segmentChange);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1,10, segmentCache, Mockito.mock(TelemetryStorage.class), cache, null, ruleBasedSegmentCache);
SegmentSynchronizationTask segmentSynchronizationTask = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1, 10, segmentCache, TELEMETRY_LISTENER, cache, EXECUTOR_FACTORY, null, ruleBasedSegmentCache);
segmentSynchronizationTask.start();
SplitFetcherImp fetcher = new SplitFetcherImp(experimentChangeFetcher, new SplitParser(), cache, TELEMETRY_STORAGE, FLAG_SETS_FILTER,
ruleBasedSegmentParser, ruleBasedSegmentCache);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package io.split.engine.segments;

import com.google.common.collect.Maps;
import io.split.Spec;
import io.split.client.LocalhostSegmentChangeFetcher;
import io.split.client.JsonLocalhostSplitChangeFetcher;
import io.split.client.interceptors.FlagSetsFilter;
Expand All @@ -11,12 +9,10 @@
import io.split.engine.common.FetchOptions;
import io.split.engine.experiments.*;
import io.split.storages.*;
import io.split.telemetry.storage.TelemetryRuntimeProducer;
import io.split.storages.memory.InMemoryCacheImp;
import io.split.storages.memory.RuleBasedSegmentCacheInMemoryImp;
import io.split.storages.memory.SegmentCacheInMemoryImpl;
import io.split.telemetry.storage.InMemoryTelemetryStorage;
import io.split.telemetry.storage.NoopTelemetryStorage;
import io.split.telemetry.storage.TelemetryStorage;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
Expand All @@ -32,6 +28,7 @@
import java.lang.reflect.Modifier;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand All @@ -47,8 +44,9 @@
*/
public class SegmentSynchronizationTaskImpTest {
private static final Logger _log = LoggerFactory.getLogger(SegmentSynchronizationTaskImpTest.class);
private static final TelemetryStorage TELEMETRY_STORAGE = Mockito.mock(InMemoryTelemetryStorage.class);
private static final TelemetryStorage TELEMETRY_STORAGE_NOOP = Mockito.mock(NoopTelemetryStorage.class);
private static final TelemetryListener TELEMETRY_STORAGE = t -> {};
private static final TelemetryListener TELEMETRY_STORAGE_NOOP = t -> {};
private static final ExecutorFactory EXECUTOR_FACTORY = (tf, name, n) -> java.util.concurrent.Executors.newScheduledThreadPool(n);

private AtomicReference<SegmentFetcher> fetcher1 = null;
private AtomicReference<SegmentFetcher> fetcher2 = null;
Expand All @@ -65,7 +63,7 @@ public void works() {

SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class);
final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, segmentCacheProducer,
TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class));
TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), EXECUTOR_FACTORY, null, Mockito.mock(RuleBasedSegmentCache.class));


// create two tasks that will separately call segment and make sure
Expand Down Expand Up @@ -104,13 +102,13 @@ public void run() {
@Test
public void testFetchAllAsynchronousAndGetFalse() throws NoSuchFieldException, IllegalAccessException {
SegmentCacheProducer segmentCacheProducer = Mockito.mock(SegmentCacheProducer.class);
ConcurrentMap<String, SegmentFetcher> _segmentFetchers = Maps.newConcurrentMap();
ConcurrentMap<String, SegmentFetcher> _segmentFetchers = new ConcurrentHashMap<>();

SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class);
SegmentFetcherImp segmentFetcher = Mockito.mock(SegmentFetcherImp.class);
_segmentFetchers.put("SF", segmentFetcher);
final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1,
segmentCacheProducer, TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class));
segmentCacheProducer, TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), EXECUTOR_FACTORY, null, Mockito.mock(RuleBasedSegmentCache.class));
Mockito.when(segmentFetcher.runWhitCacheHeader()).thenReturn(false);
Mockito.when(segmentFetcher.fetch(Mockito.anyObject())).thenReturn(false);

Expand All @@ -130,11 +128,11 @@ public void testFetchAllAsynchronousAndGetFalse() throws NoSuchFieldException, I
public void testFetchAllAsynchronousAndGetTrue() throws NoSuchFieldException, IllegalAccessException {
SegmentCacheProducer segmentCacheProducer = Mockito.mock(SegmentCacheProducer.class);

ConcurrentMap<String, SegmentFetcher> _segmentFetchers = Maps.newConcurrentMap();
ConcurrentMap<String, SegmentFetcher> _segmentFetchers = new ConcurrentHashMap<>();
SegmentChangeFetcher segmentChangeFetcher = Mockito.mock(SegmentChangeFetcher.class);
SegmentFetcherImp segmentFetcher = Mockito.mock(SegmentFetcherImp.class);
final SegmentSynchronizationTaskImp fetchers = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1L, 1, segmentCacheProducer,
TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), null, Mockito.mock(RuleBasedSegmentCache.class));
TELEMETRY_STORAGE, Mockito.mock(SplitCacheConsumer.class), EXECUTOR_FACTORY, null, Mockito.mock(RuleBasedSegmentCache.class));

// Before executing, we'll update the map of segmentFecthers via reflection.
Field segmentFetchersForced = SegmentSynchronizationTaskImp.class.getDeclaredField("_segmentFetchers");
Expand Down Expand Up @@ -162,7 +160,7 @@ public void testLocalhostSegmentChangeFetcher() throws InterruptedException, Fil
RuleBasedSegmentCache ruleBasedSegmentCache = new RuleBasedSegmentCacheInMemoryImp();
RuleBasedSegmentParser ruleBasedSegmentParser = new RuleBasedSegmentParser();

SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, TELEMETRY_STORAGE_NOOP, flagSetsFilter,
SplitFetcher splitFetcher = new SplitFetcherImp(splitChangeFetcher, splitParser, splitCacheProducer, Mockito.mock(TelemetryRuntimeProducer.class), flagSetsFilter,
ruleBasedSegmentParser, ruleBasedSegmentCache);

SplitSynchronizationTask splitSynchronizationTask = new SplitSynchronizationTask(splitFetcher, splitCacheProducer, 1000, null);
Expand All @@ -175,7 +173,7 @@ public void testLocalhostSegmentChangeFetcher() throws InterruptedException, Fil
SegmentCacheProducer segmentCacheProducer = new SegmentCacheInMemoryImpl();

SegmentSynchronizationTaskImp segmentSynchronizationTaskImp = new SegmentSynchronizationTaskImp(segmentChangeFetcher, 1000, 1, segmentCacheProducer,
TELEMETRY_STORAGE_NOOP, splitCacheProducer, null, ruleBasedSegmentCache);
TELEMETRY_STORAGE_NOOP, splitCacheProducer, EXECUTOR_FACTORY, null, ruleBasedSegmentCache);

segmentSynchronizationTaskImp.start();

Expand Down
36 changes: 36 additions & 0 deletions parsing-commons/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
<?xml version="1.0"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.split.client</groupId>
<artifactId>java-client-parent</artifactId>
<version>4.18.3</version>
</parent>

<version>4.18.3</version>
<artifactId>parsing-commons</artifactId>
<packaging>jar</packaging>
<name>Parsing Commons</name>
<description>DTOs and parsing utilities shared between Split SDK modules</description>

<dependencies>
<dependency>
<groupId>io.split.client</groupId>
<artifactId>targeting-engine</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>5.14.2</version>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Loading
Loading