Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
218 changes: 216 additions & 2 deletions amocrm/v2/entity/custom_field.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from collections import namedtuple
from datetime import datetime

from .. import fields, manager, model
Expand All @@ -17,9 +16,9 @@
DATETIME = "date_time" # Дата и время

MULTITEXT = "multitext"
LEGAL_ENTITY = "legal_entity" # Юр. лицо
# smart_address Адрес
# birthday День рождения
# legal_entity Юр. лицо
# PRICE = "price" # Цена
# CATEGORY = "category" # Категория
# ITEMS = "items" # Предметы
Expand All @@ -31,6 +30,91 @@ def __init__(self, id=None, value=None):
self.value = value


_LEGAL_ENTITY_FIELDS = (
"name",
"entity_type",
"address",
"real_address",
"bank_account_number",
"director",
"vat_id",
"tax_registration_reason_code",
"kpp",
"bank_code",
"unp",
"bin",
"egrpou",
"mfo",
"oked",
"external_uid",
)


class LegalEntityValue:
__slots__ = ("_data", "_field", "_instance", "_container")

def __init__(self, data=None, field=None, instance=None, container=None, **kwargs):
sanitized = {}
if data:
sanitized.update(
{key: value for key, value in data.items() if key in _LEGAL_ENTITY_FIELDS and value is not None}
)
for key, value in kwargs.items():
if key in _LEGAL_ENTITY_FIELDS and value is not None:
sanitized[key] = value
object.__setattr__(self, "_data", sanitized)
object.__setattr__(self, "_field", field)
object.__setattr__(self, "_instance", instance)
object.__setattr__(self, "_container", container)

def __getattr__(self, item):
if item in _LEGAL_ENTITY_FIELDS:
return self._data.get(item)
raise AttributeError(item)

def __setattr__(self, key, value):
if key in _LEGAL_ENTITY_FIELDS:
if value is None:
self._data.pop(key, None)
else:
self._data[key] = value
self._apply_changes()
return
object.__setattr__(self, key, value)

def __repr__(self):
attrs = ", ".join(f"{field}={self._data.get(field)!r}" for field in _LEGAL_ENTITY_FIELDS if field in self._data)
return f"{self.__class__.__name__}({attrs})"

def attach(self, field, instance, container=None):
object.__setattr__(self, "_field", field)
object.__setattr__(self, "_instance", instance)
object.__setattr__(self, "_container", container)
return self

def detach(self):
object.__setattr__(self, "_field", None)
object.__setattr__(self, "_instance", None)
object.__setattr__(self, "_container", None)
return self

def copy(self):
return LegalEntityValue(**self.to_dict())

def to_dict(self):
return dict(self._data)

@classmethod
def from_dict(cls, data, field=None, instance=None, container=None):
return cls(data=data or {}, field=field, instance=instance, container=container)

def _apply_changes(self):
if self._container is not None:
self._container["value"] = self.to_dict()
if self._field and self._instance:
self._field._notify_instance(self._instance)


class _FieldsRegisterMeta(type):
_REGISTER = {}

Expand Down Expand Up @@ -254,6 +338,136 @@ def on_get(self, values):
return datetime.fromtimestamp(values[0]["value"])


class LegalEntityCustomField(BaseCustomField):
type = LEGAL_ENTITY
_real_code = "LEGAL_ENTITY"
_value_fields = _LEGAL_ENTITY_FIELDS

def _find(self, instance):
field = super()._find(instance)
if field:
return field
for field in CustomFieldModel.get_for(instance):
if field.type == self.type:
return field
return None

def _get_raw_field(self, data):
field = super()._get_raw_field(data)
if field or not data:
return field
for item in data:
if item.get("field_type") == self.type:
return item
return None

def _matches_entry(self, entry):
if entry.get("field_id") and self._field_id:
return entry["field_id"] == self._field_id
if self._code and entry.get("field_code"):
return entry["field_code"] == self._code
if self._name:
return entry.get("field_name") == self._name
return entry.get("field_type") == self.type

def _get_all_field_entries(self, data):
if not data:
return []
return [item for item in data if self._matches_entry(item)]

def _fan_out_values(self, instance):
data = instance._data.get(self.name)
if not data:
return
entries = self._get_all_field_entries(data)
for entry in list(entries):
values = entry.get("values") or []
if len(values) <= 1:
continue
index = data.index(entry)
template = {key: value for key, value in entry.items() if key != "values"}
replicas = []
for value in values:
replicas.append({**template, "values": [value]})
data[index : index + 1] = replicas

def _notify_instance(self, instance):
self._fan_out_values(instance)
super()._notify_instance(instance)

def on_get_instance(self, instance, data):
if data is None:
return []
entries = self._get_all_field_entries(data)
if not entries:
return []
result = []
for entry in entries:
for container in entry.get("values", []):
raw_value = container.get("value") if isinstance(container, dict) else container
normalized = self._normalize_value(raw_value)
if not normalized:
continue
container["value"] = normalized
result.append(LegalEntityValue.from_dict(normalized, field=self, instance=instance, container=container))
return result

@staticmethod
def _normalize_value(raw):
if isinstance(raw, dict):
return {key: value for key, value in raw.items() if key in _LEGAL_ENTITY_FIELDS and value is not None}
if isinstance(raw, list):
data = {}
for item in raw:
subtype = item.get("subtype")
if not subtype:
continue
if subtype in data:
continue
data[subtype] = item.get("value")
return {key: value for key, value in data.items() if key in _LEGAL_ENTITY_FIELDS and value is not None}
if raw:
return {"name": raw}
return {}

def on_get(self, values):
if not values:
return []
result = []
for index, value in enumerate(values):
if isinstance(value, dict):
raw_value = value.get("value")
else:
raw_value = value
value = {"value": raw_value}
values[index] = value
normalized = self._normalize_value(raw_value)
if not normalized:
continue
value["value"] = normalized
result.append(LegalEntityValue.from_dict(normalized, container=value))
return result

def on_set(self, value):
items = value if isinstance(value, (list, tuple)) else [value]
prepared_values = [self._prepare_value_payload(item) for item in items]
if not prepared_values:
return []
return [{"value": item} for item in prepared_values]

def _prepare_value_payload(self, item):
if isinstance(item, LegalEntityValue):
payload = item.to_dict()
elif isinstance(item, dict):
payload = {key: item.get(key) for key in self._value_fields if key in item and item.get(key) is not None}
else:
raise TypeError("LegalEntityCustomField accepts LegalEntityValue, dict, or list of them")
name = payload.get("name")
if not name:
raise ValueError("LegalEntityCustomField requires 'name' to be set")
return payload


class ContactPhoneField(TextCustomField):
type = MULTITEXT
_real_code = "PHONE"
Expand Down