Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion bench/seed_eventstore.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@
#
defmodule EventBuilder do
alias EventStore.{EventFactory, UUID}
alias TestEventStore, as: EventStore

partitioned = Application.get_env(:eventstore, EventStore)[:partitioned_events] || false

def seed(total_event_count, events_per_stream, initial_event_number \\ 0)

Expand All @@ -24,7 +27,7 @@ defmodule EventBuilder do
event_count = min(total_event_count, events_per_stream)
events = EventFactory.create_events(event_count, initial_event_number)

:ok = TestEventStore.append_to_stream(stream_uuid, 0, events)
:ok = TestEventStore.append_to_stream(stream_uuid, 0, events, partitioned_events: partitioned)

remaining_event_count = total_event_count - event_count
next_event_number = initial_event_number + event_count
Expand Down
3 changes: 2 additions & 1 deletion bench/storage/append_events_bench.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,14 @@ defmodule AppendEventsBench do

defp append_events(context, concurrency) do
events = Keyword.fetch!(context, :events)
partitioned = Application.get_env(:eventstore, EventStore)[:partitioned_events] || false

tasks =
Enum.map(1..concurrency, fn _ ->
stream_uuid = UUID.uuid4()

Task.async(fn ->
EventStore.append_to_stream(stream_uuid, 0, events)
EventStore.append_to_stream(stream_uuid, 0, events, partitioned_events: partitioned)
end)
end)

Expand Down
3 changes: 2 additions & 1 deletion bench/storage/subscribe_to_stream_bench.exs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ defmodule SubscribeToStreamBench do
defp subscribe_to_stream(context, concurrency, opts \\ []) do
events = Keyword.fetch!(context, :events)
stream_uuid = UUID.uuid4()
partitioned = Application.get_env(:eventstore, EventStore)[:partitioned_events] || false

tasks =
Enum.map(1..concurrency, fn index ->
Expand All @@ -67,7 +68,7 @@ defmodule SubscribeToStreamBench do

append_task =
Task.async(fn ->
:ok = EventStore.append_to_stream(stream_uuid, 0, events)
:ok = EventStore.append_to_stream(stream_uuid, 0, events, partitioned_events: partitioned)
end)

Enum.each([append_task | tasks], &Task.await(&1, @await_timeout_ms))
Expand Down
6 changes: 4 additions & 2 deletions config/bench.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@ default_config = [
database: "eventstore_bench",
hostname: "localhost",
pool_size: 10,
serializer: EventStore.TermSerializer
serializer: EventStore.TermSerializer,
partitioned_events: true,
use_pg_partman: false
]

config :eventstore, TestEventStore, default_config
config :eventstore, SchemaEventStore, default_config
config :eventstore, SecondEventStore, Keyword.put(default_config, :database, "eventstore_test_2")
config :eventstore, SecondEventStore, Keyword.put(default_config, :database, "eventstore_bench_2")

config :eventstore, event_stores: [TestEventStore]
8 changes: 8 additions & 0 deletions config/config.exs
Original file line number Diff line number Diff line change
@@ -1,3 +1,11 @@
import Config

# Global configuration of EventStore
config :eventstore, EventStore,
partitioned_events: false, # Set to true if you want a partioned events table
use_pg_partman: false # Set to true if you want to use postgresql extension pg_partman

config :eventstore,
event_stores: [DevEventStore]

import_config "#{Mix.env()}.exs"
6 changes: 6 additions & 0 deletions config/dev.exs
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,9 @@ import Config
config :logger, :console, format: "[$level] $message\n"

config :mix_test_watch, clear: true

config :eventstore, DevEventStore,
schema: "event_store",
column_data_type: "jsonb",
partitioned_events: false,
use_pg_partman: false
6 changes: 5 additions & 1 deletion config/test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,13 @@ default_config = [
password: "postgres",
database: "eventstore_test",
hostname: "localhost",
schema: "public",
pool_size: 1,
serializer: EventStore.JsonSerializer,
subscription_retry_interval: 1_000
subscription_retry_interval: 1_000,
partitioned_events: false, # Default false, set to true if you want a partioned events table
use_pg_partman: false,
column_data_type: "jsonb"
]

config :eventstore, TestEventStore, default_config
Expand Down
33 changes: 33 additions & 0 deletions guides/Getting Started.md
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,36 @@ config :my_app, MyApp.EventStore,
```

This will allow the EventStore to use your regular pool settings to connect to the database defined in `url` for most database operations. It will separately establish connections using the `session_mode_url` where necessary which you should point to PgBouncer in session mode or connected directly to the Postgres instance.

## Working with partitioned events table

For performance reasons, due to a very large number of events stored in the `events` table, it may be advisable to partition this table by date (using `created_at` as the partitioning key).

### Enabling partitioning

1. To enable partitioning support, add the `partitioned_events` parameter to the configuration file (e.g. `config/dev.exs`):

```elixir
config :my_app, MyApp.EventStore,
serializer: EventStore.JsonSerializer,
username: "postgres",
password: "postgres",
database: "eventstore",
hostname: "localhost",
partitioned_events: true
```

To enable automatic partition management, it is also possible to use the PostgreSQL extension [pg_partman](https://github.com/pgpartman/pg_partman). After installing pg_partman, add the `use_pg_partman` parameter to the configuration file (e.g. `config/dev.exs`):

```elixir
config :my_app, MyApp.EventStore,
serializer: EventStore.JsonSerializer,
username: "postgres",
password: "postgres",
database: "eventstore",
hostname: "localhost",
partitioned_events: true,
use_pg_partman: true
```


11 changes: 8 additions & 3 deletions lib/event_store.ex
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ defmodule EventStore do

Use a dynamic event store by providing its name as an option to each function:

:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, name: :eventstore1)
:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, name: :eventstore1, partitioned_events: :partitioned)

{:ok, events} = EventStore.read_stream_forward(stream_uuid, 0, 1_000, name: :eventstore1)

Expand Down Expand Up @@ -183,7 +183,7 @@ defmodule EventStore do
{:ok, pid} = Postgrex.start_link(config)

Postgrex.transaction(pid, fn conn ->
:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, conn: conn)
:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, conn: conn, partitioned_events: :partitioned)
end)

This can also be used with an Ecto `Repo` which is configured to use the
Expand All @@ -194,7 +194,7 @@ defmodule EventStore do

conn = Process.get({Ecto.Adapters.SQL, pool})

:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, conn: conn)
:ok = EventStore.append_to_stream(stream_uuid, expected_version, events, conn: conn, partitioned_events: :partitioned)
end)

---
Expand Down Expand Up @@ -296,6 +296,11 @@ defmodule EventStore do

@accepted_overrides_append_to_stream [:created_at_override]

def append_to_stream(stream_uuid, expected_version, events) do
partitioned = Application.get_env(:eventstore, EventStore)[:partitioned_events] || false
append_to_stream(stream_uuid, expected_version, events, partitioned_events: partitioned)
end

def append_to_stream(stream_uuid, expected_version, events, opts \\ [])

def append_to_stream(@all_stream, _expected_version, _events, _opts),
Expand Down
8 changes: 8 additions & 0 deletions lib/event_store/config.ex
Original file line number Diff line number Diff line change
Expand Up @@ -154,4 +154,12 @@ defmodule EventStore.Config do
config
|> Keyword.merge(Keyword.get(config, :session_mode_pool, []))
end

# Retrieve the value of the optional partitioned_events parameter,
# which indicates whether the Postgres event table is partitioned.
defp partitioned_events?(config) do
config
|> Keyword.merge(Keyword.get(config, :partitioned_events, false))
end

end
Loading