Skip to content

Commit fc38c88

Browse files
authored
Merge pull request #3 from smpark-luna/master
카프카 도커인프라 추가
2 parents 1c1a828 + 141b55b commit fc38c88

9 files changed

Lines changed: 200 additions & 61 deletions

File tree

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
using System;
2+
using System.Collections.Generic;
3+
using System.Linq;
4+
using System.Threading.Tasks;
5+
using AkkaDotModule.Kafka;
6+
using Microsoft.AspNetCore.Http;
7+
using Microsoft.AspNetCore.Mvc;
8+
9+
namespace AkkaDotBootApi.Controllers
10+
{
11+
[Route("api/[controller]")]
12+
[ApiController]
13+
public class KafkaController : ControllerBase
14+
{
15+
private ProducerSystem producerSystem;
16+
17+
public KafkaController(ProducerSystem _producerSystem)
18+
{
19+
producerSystem = _producerSystem;
20+
}
21+
22+
/// <summary>
23+
/// Kafka 메시지 생성
24+
/// 개수와 tps조절가능
25+
///
26+
/// testTopic : akka100
27+
/// </summary>
28+
[HttpGet("HelloActor-Tell")]
29+
public int Kafka_ProducerMessage(int count,int tps)
30+
{
31+
List<string> messages = new List<string>();
32+
for (int i = 0; i < count; i++)
33+
{
34+
messages.Add($"message-{i}");
35+
}
36+
producerSystem.SinkMessage("producer1", "akka100", messages, tps);
37+
return 1;
38+
}
39+
}
40+
}

AkkaDotBootApi/Startup.cs

Lines changed: 6 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,20 +1,16 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.IO;
4-
using System.Reflection;
5-
using Akka.Actor;
6-
using AkkaDotBootApi.Actor;
7-
using AkkaDotModule.ActorSample;
8-
using AkkaDotModule.ActorUtils;
1+
using Akka.Actor;
2+
using AkkaDotBootApi.Test;
93
using AkkaDotModule.Config;
104
using AkkaDotModule.Kafka;
11-
using AkkaDotModule.Models;
125
using Microsoft.AspNetCore.Builder;
136
using Microsoft.AspNetCore.Hosting;
147
using Microsoft.Extensions.Configuration;
158
using Microsoft.Extensions.DependencyInjection;
169
using Microsoft.Extensions.Hosting;
1710
using Microsoft.OpenApi.Models;
11+
using System;
12+
using System.IO;
13+
using System.Reflection;
1814
using IApplicationLifetime = Microsoft.Extensions.Hosting.IApplicationLifetime;
1915

2016
namespace AkkaDotBootApi
@@ -131,56 +127,7 @@ public void Configure(IApplicationBuilder app, IWebHostEnvironment env, IApplica
131127

132128
lifetime.ApplicationStarted.Register(() =>
133129
{
134-
// HelloActor 기본액터
135-
AkkaLoad.RegisterActor("helloActor" /*AkkaLoad가 인식하는 유니크명*/,
136-
actorSystem.ActorOf(Props.Create(() => new HelloActor("webnori")), "helloActor" /*AKKA가 인식하는 Path명*/
137-
));
138-
139-
var helloActor = actorSystem.ActorSelection("user/helloActor");
140-
var helloActor2 = AkkaLoad.ActorSelect("helloActor");
141-
142-
helloActor.Tell("hello");
143-
helloActor2.Tell("hello");
144-
145-
146-
// 밸브 Work : 초당 작업량을 조절
147-
int timeSec = 1;
148-
int elemntPerSec = 5;
149-
var throttleWork = AkkaLoad.RegisterActor("throttleWork",
150-
actorSystem.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)), "throttleWork"));
151-
152-
// 실제 Work : 밸브에 방출되는 Task를 개별로 처리
153-
var worker = AkkaLoad.RegisterActor("worker", actorSystem.ActorOf(Props.Create<WorkActor>(), "worker"));
154-
155-
// 배브의 작업자를 지정
156-
throttleWork.Tell(new SetTarget(worker));
157-
158-
// KAFKA 셋팅
159-
// 각 System은 싱글톤이기때문에 DI를 통해 Controller에서 참조획득가능
160-
var consumerSystem = app.ApplicationServices.GetService<ConsumerSystem>();
161-
var producerSystem = app.ApplicationServices.GetService<ProducerSystem>();
162-
163-
//소비자 : 복수개의 소비자 생성가능
164-
consumerSystem.Start(new ConsumerAkkaOption()
165-
{
166-
KafkaGroupId = "testGroup",
167-
KafkaUrl = "kafka:9092",
168-
RelayActor = null, //소비되는 메시지가 지정 액터로 전달되기때문에,처리기는 액터로 구현
169-
Topics = "akka100"
170-
});
171-
172-
//생산자 : 복수개의 생산자 생성가능
173-
producerSystem.Start(new ProducerAkkaOption()
174-
{
175-
KafkaUrl = "kafka:9092",
176-
ProducerName = "producer1"
177-
});
178-
179-
List<string> messages = new List<string>();
180-
//보너스 : 생산의 속도를 조절할수 있습니다.
181-
int tps = 10;
182-
producerSystem.SinkMessage("producer1", "akka100", messages, tps);
183-
130+
TestAkka.Run(app, actorSystem);
184131
});
185132
}
186133
}

AkkaDotBootApi/Test/TestAkka.cs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
using Akka.Actor;
2+
using AkkaDotBootApi.Actor;
3+
using AkkaDotModule.ActorSample;
4+
using AkkaDotModule.ActorUtils;
5+
using AkkaDotModule.Config;
6+
using AkkaDotModule.Kafka;
7+
using AkkaDotModule.Models;
8+
using Microsoft.AspNetCore.Builder;
9+
using Microsoft.Extensions.DependencyInjection;
10+
using System.Collections.Generic;
11+
12+
namespace AkkaDotBootApi.Test
13+
{
14+
public class TestAkka
15+
{
16+
static public void Run(IApplicationBuilder app, ActorSystem actorSystem)
17+
{
18+
// HelloActor 기본액터
19+
AkkaLoad.RegisterActor("helloActor" /*AkkaLoad가 인식하는 유니크명*/,
20+
actorSystem.ActorOf(Props.Create(() => new HelloActor("webnori")), "helloActor" /*AKKA가 인식하는 Path명*/
21+
));
22+
23+
var helloActor = actorSystem.ActorSelection("user/helloActor");
24+
var helloActor2 = AkkaLoad.ActorSelect("helloActor");
25+
26+
helloActor.Tell("hello");
27+
helloActor2.Tell("hello");
28+
29+
30+
// 밸브 Work : 초당 작업량을 조절
31+
int timeSec = 1;
32+
int elemntPerSec = 5;
33+
var throttleWork = AkkaLoad.RegisterActor("throttleWork",
34+
actorSystem.ActorOf(Props.Create(() => new ThrottleWork(elemntPerSec, timeSec)), "throttleWork"));
35+
36+
// 실제 Work : 밸브에 방출되는 Task를 개별로 처리
37+
var worker = AkkaLoad.RegisterActor("worker", actorSystem.ActorOf(Props.Create<WorkActor>(), "worker"));
38+
39+
// 배브의 작업자를 지정
40+
throttleWork.Tell(new SetTarget(worker));
41+
42+
// KAFKA 셋팅
43+
// 각 System은 싱글톤이기때문에 DI를 통해 Controller에서 참조획득가능
44+
var consumerSystem = app.ApplicationServices.GetService<ConsumerSystem>();
45+
var producerSystem = app.ApplicationServices.GetService<ProducerSystem>();
46+
47+
//소비자 : 복수개의 소비자 생성가능
48+
consumerSystem.Start(new ConsumerAkkaOption()
49+
{
50+
KafkaGroupId = "testGroup",
51+
KafkaUrl = "kafka:9092",
52+
RelayActor = null, //소비되는 메시지가 지정 액터로 전달되기때문에,처리기는 액터로 구현
53+
Topics = "akka100"
54+
});
55+
56+
//생산자 : 복수개의 생산자 생성가능
57+
producerSystem.Start(new ProducerAkkaOption()
58+
{
59+
KafkaUrl = "kafka:9092",
60+
ProducerName = "producer1"
61+
});
62+
63+
List<string> messages = new List<string>();
64+
for(int i=0; i < 10; i++)
65+
{
66+
messages.Add($"message-{i}");
67+
}
68+
69+
//보너스 : 생산의 속도를 조절할수 있습니다.
70+
int tps = 10;
71+
producerSystem.SinkMessage("producer1", "akka100", messages, tps);
72+
73+
}
74+
}
75+
}

AkkaDotModule.sln

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "TestAkkaDotModule", "TestAk
1818
EndProject
1919
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "AkkaDotBootApi", "AkkaDotBootApi\AkkaDotBootApi.csproj", "{FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}"
2020
EndProject
21+
Project("{E53339B2-1760-4266-BCC7-CA923CBCF16C}") = "docker-compose-infra", "Compose\docker-compose-infra.dcproj", "{A325C35E-28E0-43CE-B28F-516DC1568E18}"
22+
EndProject
2123
Global
2224
GlobalSection(SolutionConfigurationPlatforms) = preSolution
2325
Debug|Any CPU = Debug|Any CPU
@@ -36,6 +38,10 @@ Global
3638
{FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}.Debug|Any CPU.Build.0 = Debug|Any CPU
3739
{FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}.Release|Any CPU.ActiveCfg = Release|Any CPU
3840
{FE2EF1A7-08AC-4142-BD71-BD9E9FCE09ED}.Release|Any CPU.Build.0 = Release|Any CPU
41+
{A325C35E-28E0-43CE-B28F-516DC1568E18}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
42+
{A325C35E-28E0-43CE-B28F-516DC1568E18}.Debug|Any CPU.Build.0 = Debug|Any CPU
43+
{A325C35E-28E0-43CE-B28F-516DC1568E18}.Release|Any CPU.ActiveCfg = Release|Any CPU
44+
{A325C35E-28E0-43CE-B28F-516DC1568E18}.Release|Any CPU.Build.0 = Release|Any CPU
3945
EndGlobalSection
4046
GlobalSection(SolutionProperties) = preSolution
4147
HideSolutionNode = FALSE

AkkaDotModule/AkkaDotModule.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<TargetFramework>netstandard2.0</TargetFramework>
55
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
66
<PackageId>AkkaDotModule.Webnori</PackageId>
7-
<Version>1.0.5</Version>
7+
<Version>1.0.6</Version>
88
<RepositoryUrl>https://github.com/psmon/AkkaDotModule</RepositoryUrl>
99
<PackageProjectUrl>https://github.com/psmon/AkkaDotModule</PackageProjectUrl>
1010
</PropertyGroup>

AkkaDotModule/Kafka/ProducerSystem.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ public ProducerSystem()
3939
public void Start(ProducerAkkaOption producerAkkaOption)
4040
{
4141
materializer_producer = producerSystem.Materializer();
42-
producerList[producerAkkaOption.KafkaUrl] = ProducerSettings<Null, string>.Create(producerSystem, null, null)
42+
producerList[producerAkkaOption.ProducerName] = ProducerSettings<Null, string>.Create(producerSystem, null, null)
4343
.WithBootstrapServers(producerAkkaOption.KafkaUrl);
4444
}
4545

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
<?xml version="1.0" encoding="utf-8"?>
2+
<Project ToolsVersion="15.0" Sdk="Microsoft.Docker.Sdk">
3+
<PropertyGroup Label="Globals">
4+
<ProjectVersion>2.1</ProjectVersion>
5+
<DockerTargetOS>Linux</DockerTargetOS>
6+
<ProjectGuid>A325C35E-28E0-43CE-B28F-516DC1568E18</ProjectGuid>
7+
<DockerLaunchAction>None</DockerLaunchAction>
8+
<DockerServiceUrl>{Scheme}://localhost:{ServicePort}/swagger</DockerServiceUrl>
9+
<DockerServiceName>akka-infra</DockerServiceName>
10+
</PropertyGroup>
11+
<ItemGroup>
12+
<None Include="docker-compose.yml" />
13+
<None Include=".dockerignore" />
14+
</ItemGroup>
15+
</Project>

Compose/docker-compose.yml

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
version: '3.4'
2+
3+
services:
4+
mysql:
5+
image: mysql:5.7
6+
command: --default-authentication-plugin=mysql_native_password
7+
restart: always
8+
ports:
9+
- 13306:3306
10+
environment:
11+
MYSQL_ROOT_PASSWORD: "root"
12+
TZ: "Asia/Seoul"
13+
14+
postgres:
15+
image: postgres:9.6
16+
container_name: postgres
17+
environment:
18+
- POSTGRES_USER=docker
19+
- POSTGRES_PASSWORD=docker
20+
ports:
21+
- '5432:5432'
22+
volumes:
23+
- postgre-data:/usr/local/psql/data
24+
25+
redis_auth:
26+
image: bitnami/redis:5.0
27+
environment:
28+
- ALLOW_EMPTY_PASSWORD=yes
29+
ports:
30+
- 7000:6379
31+
zookeeper:
32+
image: 'bitnami/zookeeper:latest'
33+
ports:
34+
- '2181:2181'
35+
environment:
36+
- ALLOW_ANONYMOUS_LOGIN=yes
37+
labels:
38+
io.rancher.scheduler.affinity:host_label: platform=etc01
39+
io.rancher.container.hostname_override: container_name
40+
kafka:
41+
hostname: kafka
42+
image: 'bitnami/kafka:latest'
43+
ports:
44+
- '9092:9092'
45+
environment:
46+
- KAFKA_ADVERTISED_HOST_NAME=kafka
47+
- KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181
48+
- ALLOW_PLAINTEXT_LISTENER=yes
49+
50+
volumes:
51+
elasticsearch-data:
52+
driver: local
53+
postgre-data:
54+
driver: local
55+

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ AKKA의 버전업에 항상 대응하는것이아닌, 유닛테스트를 통해
2323

2424
# 주요 릴리즈 노트
2525

26+
- 1.0.6 : Kafka 도커 인프라추가및, TestAPI 샘플 추가
2627
- 1.0.5 : Kafka Stream 지원 : 액터시스템을 이용하여 Kafka를 더 심플하고 강력하게 사용가능합니다.
2728
- 1.0.4 : AKKA 1.4.7 버전사용
2829
- 1.0.3 : 메시지 우선순위([PriorityMessageMailbox](TestAkkaDotModule/TestActors/PriorityMessageMailboxTest.cs)) 처리기 추가

0 commit comments

Comments
 (0)