Skip to content

Commit

Permalink
updated to 0.8.9
Browse files Browse the repository at this point in the history
  • Loading branch information
robertroeser committed Aug 14, 2018
1 parent e7df71f commit 438a06f
Show file tree
Hide file tree
Showing 8 changed files with 53 additions and 60 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# proteus-spring
[![Build Status](https://travis-ci.org/netifi-proteus/proteus-spring.svg?branch=master)](https://travis-ci.org/netifi-proteus/proteus-spring)
[![Join the chat at https://gitter.im/netifi/general](https://badges.gitter.im/netifi/general.svg)](https://gitter.im/netifi/general)
[![Build Status](https://travis-ci.org/netifi-proteus/proteus-spring.svg?branch=master)](https://travis-ci.org/netifi-proteus/proteus-spring)

A simple and easy mechanism for integrating the power of [Netifi Proteus](https://www.netifi.com) into your [Spring Boot](https://spring.io/projects/spring-boot) applications.

Expand Down
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ allprojects {

ext {
protobufVersion = '3.6.0'
proteusVersion = '0.8.5'
proteusVersion = '0.8.9'
springbomVersion = 'Cairo-SR1'
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

import com.netifi.proteus.demo.core.RandomString;
import com.netifi.proteus.demo.vowelcount.service.VowelCountRequest;
import com.netifi.proteus.demo.vowelcount.service.VowelCountResponse;
import com.netifi.proteus.demo.vowelcount.service.VowelCountServiceClient;
import io.netifi.proteus.annotations.ProteusClient;
import org.slf4j.Logger;
Expand All @@ -26,37 +27,40 @@
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ThreadLocalRandom;

@Component
public class DemoRunner implements CommandLineRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(DemoRunner.class);
private static final Logger LOGGER = LoggerFactory.getLogger(DemoRunner.class);

@Autowired
private RandomString randomString;
@Autowired private RandomString randomString;

@ProteusClient(group = "com.netifi.proteus.demo.vowelcount")
private VowelCountServiceClient client;
@ProteusClient(group = "com.netifi.proteus.demo.vowelcount")
private VowelCountServiceClient client;

@Override
public void run(String... args) throws Exception {
CountDownLatch latch = new CountDownLatch(1);
@Override
public void run(String... args) throws Exception {

// Generate stream of random strings
Flux<VowelCountRequest> requests = Flux.range(1, 100)
.map(cnt -> VowelCountRequest.newBuilder()
// Generate stream of random strings
Flux<VowelCountRequest> requests =
Flux.range(1, 100)
.map(
cnt ->
VowelCountRequest.newBuilder()
.setMessage(randomString.next(10, ThreadLocalRandom.current()))
.build());

// Send stream of random strings to vowel count service
client.countVowels(requests)
.onBackpressureDrop()
.doOnComplete(latch::countDown)
.subscribe(response -> {
LOGGER.info("Total Vowels: {}", response.getVowelCnt());
});
/*
// Send stream of random strings to vowel count service
VowelCountResponse response = client.countVowels(requests).block();
latch.await();
}
LOGGER.info("Total Vowels: {}", response.getVowelCnt());
System.exit(0);
*/

// Send stream of random strings to vowel count service
client.countVowels(requests).doOnError(Throwable::printStackTrace).subscribe(response -> LOGGER.info("Total Vowels: {}", response.getVowelCnt()));

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
netifi.proteus.broker.hostname=localhost
netifi.proteus.broker.port=8001

netifi.proteus.accesskey=3006839580103245170
netifi.proteus.accesstoken=SkOlZxqQcTboZE3fni4OVXVC0e0=
netifi.proteus.accesskey=9007199254740991
netifi.proteus.accesstoken=kTBDVtfRBO4tHOnZzSyY5ym2kfY=
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
netifi.proteus.brokerHostname=localhost
netifi.proteus.brokerPort=8001

netifi.proteus.accessKey=3006839580103245170
netifi.proteus.accessToken=SkOlZxqQcTboZE3fni4OVXVC0e0=
netifi.proteus.accesskey=9007199254740991
netifi.proteus.accesstoken=kTBDVtfRBO4tHOnZzSyY5ym2kfY=
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ option java_outer_classname = "VowelCountServiceProto";
option java_multiple_files = true;

service VowelCountService {
rpc CountVowels (stream VowelCountRequest) returns (stream VowelCountResponse) {}
rpc CountVowels (stream VowelCountRequest) returns (VowelCountResponse) {}
}

message VowelCountRequest {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,38 +23,26 @@
import org.reactivestreams.Publisher;
import org.springframework.stereotype.Component;
import reactor.core.publisher.Flux;

import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import reactor.core.publisher.Mono;

@Component
public class DefaultVowelCountService implements VowelCountService {
private final AtomicLong totalVowels = new AtomicLong(0);

@ProteusClient(group = "com.netifi.proteus.demo.isvowel")
private IsVowelServiceClient isVowelClient;
@ProteusClient(group = "com.netifi.proteus.demo.isvowel")
private IsVowelServiceClient isVowelClient;

@Override
public Flux<VowelCountResponse> countVowels(Publisher<VowelCountRequest> messages, ByteBuf metadata) {
return Flux.from(s ->
Flux.from(messages)
// Split each incoming random string into a stream of individual characters
.flatMap(vowelCountRequest -> Flux.just(vowelCountRequest.getMessage().split("(?<!^)")))
// For each individual character create an IsVowelRequest
.map(c -> IsVowelRequest.newBuilder().setCharacter(c).build())
// Send each IsVowelRequest to the IsVowel service
.flatMap((Function<IsVowelRequest, Publisher<IsVowelResponse>>) isVowelClient::isVowel)
.doOnNext(isVowelResponse -> {
if (isVowelResponse.getIsVowel()) {
// For every vowel found by the IsVowel service, increment the vowel count and send the
// current vowel count to the client
s.onNext(VowelCountResponse.newBuilder()
.setVowelCnt(totalVowels.incrementAndGet())
.build());
}
})
.doOnComplete(s::onComplete)
.subscribe()
);
}
@Override
public Mono<VowelCountResponse> countVowels(
Publisher<VowelCountRequest> messages, ByteBuf metadata) {
return Flux.from(messages)
.flatMap(
request ->
Flux.fromArray(request.getMessage().split("(?<!^)"))
.flatMap(
s ->
isVowelClient
.isVowel(IsVowelRequest.newBuilder().setCharacter(s).build())
.filter(IsVowelResponse::getIsVowel)))
.count()
.map(count -> VowelCountResponse.newBuilder().setVowelCnt(count).build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@
netifi.proteus.brokerHostname=localhost
netifi.proteus.brokerPort=8001

netifi.proteus.accessKey=3006839580103245170
netifi.proteus.accessToken=SkOlZxqQcTboZE3fni4OVXVC0e0=
netifi.proteus.accesskey=9007199254740991
netifi.proteus.accesstoken=kTBDVtfRBO4tHOnZzSyY5ym2kfY=

0 comments on commit 438a06f

Please sign in to comment.