Skip to content

Commit 43b04d1

Browse files
committed
ConsumerActor 액터개선
- 블락킹모드에서 비동기모드로 변경
1 parent cb145d4 commit 43b04d1

8 files changed

Lines changed: 97 additions & 40 deletions

File tree

AkkaDotBootApi/Controllers/KafkaController.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ public KafkaController(ProducerSystem _producerSystem)
2323
}
2424

2525
/// <summary>
26-
/// Kafka 메시지 생성 : System이용
26+
/// Kafka 메시지 생성 : ProducerSystem
2727
/// 개수와 tps조절가능
2828
///
2929
/// testTopic : akka100
@@ -41,7 +41,7 @@ public int Kafka_ProducerMessageByActorSystem(int count, int tps)
4141
}
4242

4343
/// <summary>
44-
/// Kafka 메시지 생성 : Actor이용(이모델을 사용추천)
44+
/// Kafka 메시지 생성 : Actor이용
4545
///
4646
/// testTopic : akka100
4747
/// </summary>

AkkaDotModule/ActorUtils/Confluent/ConsumerActor.cs

Lines changed: 10 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Confluent.Kafka;
66
using System;
77
using System.Threading;
8+
using System.Threading.Tasks;
89

910
namespace AkkaDotModule.ActorUtils.Confluent
1011
{
@@ -82,13 +83,16 @@ public ConsumerActor(ConsumerAkkaOption consumerAkkaOption)
8283

8384
ReceiveAsync<ConsumerPull>(async msg =>
8485
{
85-
IActorRef selfActor = this.Self;
86-
var cr = consumer.Consume(cancellationTokenSource.Token);
87-
selfActor.Tell(new KafkaTextMessage()
86+
await Task.Run(async () =>
8887
{
89-
Topic = cr.Topic,
90-
Message = cr.Message.Value
91-
});
88+
var cr = consumer.Consume(cancellationTokenSource.Token);
89+
var kafkamsg = new KafkaTextMessage()
90+
{
91+
Topic = cr.Topic,
92+
Message = cr.Message.Value
93+
};
94+
return kafkamsg;
95+
}).PipeTo(Self);
9296
});
9397

9498
ReceiveAsync<KafkaTextMessage>(async msg =>

AkkaDotModule/AkkaDotModule.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<TargetFramework>netstandard2.0</TargetFramework>
55
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
66
<PackageId>AkkaDotModule.Webnori</PackageId>
7-
<Version>1.0.9</Version>
7+
<Version>1.1.0</Version>
88
<RepositoryUrl>https://github.com/psmon/AkkaDotModule</RepositoryUrl>
99
<PackageProjectUrl>https://github.com/psmon/AkkaDotModule</PackageProjectUrl>
1010
</PropertyGroup>

AkkaDotModule/Kafka/ConsumerSystem.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,6 @@ public void Start(ConsumerAkkaOption consumerActorOption)
2828
.WithBootstrapServers(consumerActorOption.BootstrapServers)
2929
.WithGroupId(consumerActorOption.KafkaGroupId);
3030

31-
if(consumerActorOption.SecurityOption != null)
32-
{
33-
KafkaSecurityOption kafkaSecurityOption = consumerActorOption.SecurityOption;
34-
}
35-
3631

3732
var materializer_consumer = consumerSystem.Materializer();
3833

AkkaDotModule/Kafka/ProducerSystem.cs

Lines changed: 2 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ namespace AkkaDotModule.Kafka
1717
public class ProducerSystem
1818
{
1919
private ActorSystem producerSystem;
20+
2021
private ActorMaterializer materializer_producer;
2122

2223
private Dictionary<string, ProducerSettings<Null, string>> producerList = new Dictionary<string, ProducerSettings<Null, string>>();
@@ -36,21 +37,7 @@ public void Start(ProducerAkkaOption producerAkkaOption)
3637
var producer = ProducerSettings<Null, string>.Create(producerSystem, null, null)
3738
.WithBootstrapServers(producerAkkaOption.BootstrapServers);
3839

39-
if(producerAkkaOption.SecurityOption != null)
40-
{
41-
KafkaSecurityOption kafkaSecurityOption = producerAkkaOption.SecurityOption;
42-
/*
43-
producer = producer
44-
.WithProperty("security.protocol", kafkaSecurityOption.SecurityProtocol)
45-
.WithProperty("sasl.mechanism", kafkaSecurityOption.SaslMechanism)
46-
.WithProperty("sasl.username", kafkaSecurityOption.SaslUsername)
47-
.WithProperty("sasl.password", kafkaSecurityOption.SaslPassword);
48-
//.WithProperty("kafka-clients.ssl.calocation", kafkaSecurityOption.SslCaLocation);
49-
*/
50-
};
51-
5240
producerList[producerAkkaOption.ProducerName] = producer;
53-
5441
}
5542

5643
public void SinkMessage(string producerName, string topic,List<string> message,int tps)
@@ -59,7 +46,7 @@ public void SinkMessage(string producerName, string topic,List<string> message,i
5946

6047
Source<string, NotUsed> source = Source.From(message);
6148
source
62-
.Throttle(tps, TimeSpan.FromSeconds(1), 1, ThrottleMode.Shaping) //TPS
49+
.Throttle(tps, TimeSpan.FromSeconds(1), 100, ThrottleMode.Shaping) //TPS
6350
.Select(c =>
6451
{
6552
return c;

TestAkkaDotModule/ActorSample/PersistentActor.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public ExampleState(ImmutableList<string> events)
4040
}
4141

4242
public ExampleState() : this(ImmutableList.Create<string>())
43-
{
43+
{
4444
}
4545

4646
public ExampleState Updated(Evt evt)

TestAkkaDotModule/TestActors/RouterTest.cs

Lines changed: 9 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -26,22 +26,21 @@ public void Setup()
2626
/// <summary>
2727
/// 사용목적 : 액터를 라운드 로빈으로 구성하고 분산처리할때
2828
/// </summary>
29-
/// <param name="timeSec"></param>
29+
/// <param name="poolLength"></param>
3030
/// <param name="elemntPerSec"></param>
31-
[Theory(DisplayName = "fire를 5번 전송하면, 5개의 액터가 각각 균등처리 ")]
32-
[InlineData(3)]
33-
public void TestRoundRobbin(int expectedTestSec)
31+
[Theory(DisplayName = "fire를 n번 전송하면, n개의 액터가 각각 균등처리 ")]
32+
[InlineData(10,3)]
33+
public void TestRoundRobbin(int poolLength,int expectedTestSec)
3434
{
3535
var helloActor = Sys.ActorOf(Props.Create(() =>
36-
new HelloActor("Pool5")).WithRouter(new RoundRobinPool(5)));
36+
new HelloActor("Pool5")).WithRouter(new RoundRobinPool(poolLength)));
3737

3838
Within(TimeSpan.FromSeconds(expectedTestSec), () =>
3939
{
40-
helloActor.Tell("fire1", this.TestActor);
41-
helloActor.Tell("fire2", this.TestActor);
42-
helloActor.Tell("fire3", this.TestActor);
43-
helloActor.Tell("fire4", this.TestActor);
44-
helloActor.Tell("fire5", this.TestActor);
40+
for(int i=0; i< poolLength; i++)
41+
{
42+
helloActor.Tell("fire:"+(i+1), this.TestActor);
43+
}
4544

4645
// 응답메시지가 없음을 검사(대기)
4746
probe.ExpectNoMsg(TimeSpan.FromSeconds(1));
Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
using Akka;
2+
using Akka.Actor;
3+
using Akka.Routing;
4+
using Akka.Streams;
5+
using Akka.Streams.Dsl;
6+
using Akka.TestKit;
7+
using AkkaDotModule.ActorSample;
8+
using AkkaNetCoreTest;
9+
using System;
10+
using System.Linq;
11+
using Xunit;
12+
using Xunit.Abstractions;
13+
14+
namespace TestAkkaDotModule.TestActors
15+
{
16+
public class StreamTest : TestKitXunit
17+
{
18+
protected TestProbe probe;
19+
20+
public StreamTest(ITestOutputHelper output) : base(output)
21+
{
22+
Setup();
23+
}
24+
25+
public void Setup()
26+
{
27+
//스트림을 제공받는 최종 소비자 ( 물을 제공 받는 고객 )
28+
probe = this.CreateTestProbe();
29+
}
30+
31+
/// <summary>
32+
/// 사용목적 : 특정 메시지를 송신하고, 완료처리를 비동기로 받을때 사용
33+
/// </summary>
34+
/// <param name="elemntPerSec"></param>
35+
[Theory(DisplayName = "Graph연산규칙에따라 Stream연산이된다")]
36+
[InlineData(3)]
37+
public void TestFireAndForget(int expectedTestSec)
38+
{
39+
var materializer = Sys.Materializer();
40+
41+
var g = RunnableGraph.FromGraph(GraphDsl.Create(builder =>
42+
{
43+
var source = Source.From(Enumerable.Range(1, 10));
44+
var sink = Sink.Ignore<int>().MapMaterializedValue(_ => NotUsed.Instance);
45+
var sinkConsole = Sink.ForEach<int>(x => Console.WriteLine(x.ToString()))
46+
.MapMaterializedValue(_ => NotUsed.Instance);
47+
48+
var broadcast = builder.Add(new Broadcast<int>(2));
49+
var merge = builder.Add(new Merge<int>(2));
50+
51+
var f1 = Flow.Create<int>().Select(x => x + 10);
52+
var f2 = Flow.Create<int>().Select(x => x + 20);
53+
var f3 = Flow.Create<int>().Select(x => x + 1);
54+
var f4 = Flow.Create<int>().Select(x => x + 10);
55+
56+
builder.From(source).Via(f1).Via(broadcast).Via(f2).Via(merge).Via(f3).To(sinkConsole);
57+
builder.From(broadcast).Via(f4).To(merge);
58+
59+
return ClosedShape.Instance;
60+
}));
61+
62+
g.Run(materializer);
63+
64+
65+
Within(TimeSpan.FromSeconds(expectedTestSec), () =>
66+
{
67+
// 응답메시지가 없음을 검사
68+
probe.ExpectNoMsg(TimeSpan.FromSeconds(1));
69+
});
70+
}
71+
}
72+
}

0 commit comments

Comments
 (0)