Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 71 additions & 17 deletions lib/arke_postgres.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,36 @@ defmodule ArkePostgres do
case check_env() do
{:ok, nil} ->
try do
projects =
Query.get_project_record()
|> Enum.sort_by(&(to_string(&1.id) == "arke_system"), :desc)

projects =Query.get_project_record() |> Enum.sort_by(&(to_string(&1.id) == "arke_system"),:desc)
Enum.each(projects, fn %{id: project_id} = _project ->
start_managers(project_id)
end)

:ok
rescue
err in DBConnection.ConnectionError ->
%{message: message,reason: reason} = err
parsed_message = %{context: "db_connection_error", message: "error: #{err}, msg: #{message}"}
IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ])
%{message: message, reason: reason} = err

parsed_message = %{
context: "db_connection_error",
message: "error: #{err}, msg: #{message}"
}

IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan])
:error

err in Postgrex.Error ->
%{message: message,postgres: %{code: code, message: postgres_message}} = err
parsed_message = %{context: "postgrex_error", message: "#{message || postgres_message}"}
IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ])
%{message: message, postgres: %{code: code, message: postgres_message}} = err

parsed_message = %{
context: "postgrex_error",
message: "#{message || postgres_message}"
}

IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan])
:error
end

Expand Down Expand Up @@ -69,18 +82,20 @@ defmodule ArkePostgres do
end
end

defp start_managers(project_id) when is_binary(project_id), do: start_managers(String.to_atom(project_id))
defp start_managers(project_id) when is_binary(project_id),
do: start_managers(String.to_atom(project_id))

defp start_managers(project_id) do
{parameters, arke_list, groups} = Query.get_manager_units(project_id)

Arke.handle_manager(parameters,project_id,:parameter)
Arke.handle_manager(arke_list,project_id,:arke)
Arke.handle_manager(groups,project_id,:group)

Arke.handle_manager(parameters, project_id, :parameter)
Arke.handle_manager(arke_list, project_id, :arke)
Arke.handle_manager(groups, project_id, :group)
end

def create(project, %{arke_id: arke_id} = unit) do
arke = Arke.Boundary.ArkeManager.get(arke_id, project)

case handle_create(project, arke, unit) do
{:ok, unit} ->
{:ok,
Expand Down Expand Up @@ -149,6 +164,42 @@ defmodule ArkePostgres do
{:error, "arke type not supported"}
end

def update_key(%{arke_id: arke_id, metadata: %{project: project}} = old_unit, new_unit) do
arke = Arke.Boundary.ArkeManager.get(arke_id, project)
{:ok, unit} = handle_update_key(arke, old_unit, new_unit)
end

def handle_update_key(
%{data: %{type: "table"}} = arke,
_old_unit,
%{data: data, metadata: %{project: project} = metadata} = new_unit
) do
data =
new_unit
|> filter_primary_keys(false)
# todo: remove once the project is not needed anymore
|> Map.merge(%{metadata: Map.delete(metadata, :project)})
|> data_as_klist

where = new_unit |> filter_primary_keys(true) |> data_as_klist

Table.update(project, arke, data, where)
{:ok, new_unit}
end

def handle_update_key(
%{data: %{type: "arke"}} = arke,
old_unit,
new_unit
) do
ArkeUnit.update_key(arke, old_unit, new_unit)
{:ok, new_unit}
end

def handle_update_key(_arke, _old_unit, _new_unit) do
{:error, "arke type not supported"}
end

def delete(project, %{arke_id: arke_id} = unit) do
arke = Arke.Boundary.ArkeManager.get(arke_id, project)
handle_delete(project, arke, unit)
Expand Down Expand Up @@ -204,7 +255,8 @@ defmodule ArkePostgres do
Enum.to_list(data)
end

defp handle_changeset_errros(errors)when is_binary(errors), do: errors
defp handle_changeset_errros(errors) when is_binary(errors), do: errors

defp handle_changeset_errros(errors) do
Enum.map(errors, fn {field, detail} ->
"#{field}: #{render_detail(detail)}"
Expand Down Expand Up @@ -234,15 +286,17 @@ defmodule ArkePostgres do
IO.inspect("DBConnection.ConnectionError")
%{message: message} = err
parsed_message = %{context: "db_connection_error", message: "#{message}"}
IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ])
IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan])
:error

err in Postgrex.Error ->
IO.inspect("Postgrex.Error")
%{message: message,postgres: %{code: code, message: postgres_message}} = err
%{message: message, postgres: %{code: code, message: postgres_message}} = err
parsed_message = %{context: "postgrex_error", message: "#{message || postgres_message}"}
IO.inspect(parsed_message,syntax_colors: [string: :red,atom: :cyan, ])
IO.inspect(parsed_message, syntax_colors: [string: :red, atom: :cyan])
:error
err ->

err ->
IO.inspect("uncatched error")
IO.inspect(err)
:error
Expand Down
54 changes: 53 additions & 1 deletion lib/arke_postgres/arke_unit.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

defmodule ArkePostgres.ArkeUnit do
alias Arke.Boundary.ArkeManager
import Ecto.Query, only: [from: 2]
import Ecto.Query
alias Arke.Utils.ErrorGenerator, as: Error

@record_fields [:id, :data, :metadata, :inserted_at, :updated_at]
Expand Down Expand Up @@ -60,6 +60,48 @@ defmodule ArkePostgres.ArkeUnit do
ArkePostgres.Repo.update_all(query, [], prefix: project)
end

def update_key(
arke,
old_unit,
%{data: data, metadata: %{project: project} = metadata} = unit,
where \\ []
) do
where = Keyword.put_new(where, :arke_id, Atom.to_string(unit.arke_id))
where = Keyword.put_new(where, :id, Atom.to_string(unit.id))

diff_keys = diff_keys(old_unit.data, data, Map.keys(data))

encoded_data = encode_unit_data(arke, diff_keys)

# Build the jsonb_set operations for each changed key
data_dynamic =
Enum.reduce(encoded_data, dynamic([u], u.data), fn {param_id, new_value}, acc ->
dynamic(
[u],
fragment(
"jsonb_set(?, ?, ?::jsonb, true)",
^acc,
^[to_string(param_id)],
^new_value
)
)
end)

query =
from(u in "arke_unit",
where: ^where,
update: [
set: [
data: ^data_dynamic,
metadata: ^Map.delete(unit.metadata, "project"),
updated_at: ^unit.updated_at
]
]
)

ArkePostgres.Repo.update_all(query, [], prefix: project)
end

def delete(project, arke, unit) do
where = [arke_id: Atom.to_string(arke.id), id: Atom.to_string(unit.id)]
query = from(a in "arke_unit", where: ^where)
Expand Down Expand Up @@ -130,4 +172,14 @@ defmodule ArkePostgres.ArkeUnit do
{map, data} = Map.pop(data, key, %{})
{map || %{}, data}
end

def diff_keys(old_data, new_data, keys) do
Enum.reduce(new_data, [], fn {key, value}, acc ->
if key in keys and old_data[key] != value do
[{key, value}] ++ acc
else
acc
end
end)
end
end