angee.messaging.managers
Managers that own the messaging write path — the channel-sync ingest.
A channel backend parses a source into neutral ParsedMessage rows; these managers turn each into a :class:~angee.messaging.models.Message with its thread, its recursive :class:~angee.messaging.models.Part tree (text content-addressed into :class:~angee.messaging.models.Fragment\s), its participants, and its quotation edges. They encode the invariants a high-volume email sync depends on:
(platform, external_id)update_or_createkeys make re-sync idempotent.- null bytes (
\x00) are stripped before every write (Postgres rejects them). - thread resolution is the 4-step RFC-5322 priority under
select_for_update. - denormalised counters bump with
F(), never read-modify-write. - the quotation graph FK-joins on shared fragments, skipping boilerplate quoted by more than :data:
_BOILERPLATE_CUTOFFmessages.
The sync runs under system_context; created_by is set to the channel owner.
strip_null_bytes
def strip_null_bytes(value: Any) -> AnyRecursively remove \x00 from strings inside str/dict/list values.
Email bodies routinely contain null bytes, which Postgres rejects in text/JSON columns; stripping them on the write path keeps a large sync from hard-failing.
normalize_subject
def normalize_subject(subject: str) -> strStrip repeated Re:/Fwd:/… prefixes and collapse whitespace for matching.
message_subtype_options
def message_subtype_options(
model_label: str = "") -> tuple[dict[str, Any], ...]Return follower-selectable subtype options for model_label.
The option list is deterministic even before any message has created subtype rows. Existing global/model rows override labels and flags for their key.
FragmentManager
class FragmentManager(AngeeManager)Content-addressed text store: one row per distinct (null-stripped) text.
upsert
def upsert(*, text: str, kind: str = "paragraph", owner_id: Any = None) -> AnyGet-or-create a fragment by the SHA-256 of its cleaned (null-stripped, trimmed) text.
ThreadManager
class ThreadManager(AngeeManager)Owns thread resolution — the 4-step RFC-5322 priority under a row lock.
resolve
def resolve(*,
platform: str,
channel: Any,
subject: str = "",
in_reply_to: str = "",
references: tuple[str, ...] = (),
message_external_id: str = "",
owner_id: Any = None,
modality: Any = None,
visibility: Any = None) -> AnyResolve the thread a message belongs to, creating one if needed.
Priority: In-Reply-To → References (newest-first, i.e. right-to-left, resolved in one batch query) → normalised subject → a new thread. The subject match and the create run under select_for_update on a deterministic external id (subj:<normalized> or msg:<id>) so two concurrent batches resolving the same subject collide on the unique constraint and converge to one thread instead of double-creating.
A message with no threading hint and no subject keys on msg:<external_id> — its own one-message thread, never merged with another, because there is no key to merge on (collapsing keyless messages would fuse unrelated mail). The inbox groups such threads individually; they read as standalone conversations.
modality/visibility land a newly created thread under a non-email :class:~angee.messaging.models.Thread.Modality / :class:~angee.messaging.models.Thread.Visibility — a public feed passes PUBLIC_THREAD/PUBLIC so the row is born public instead of being bulk-updated afterward. Each defaults to the private email-thread shape and is ignored when an existing thread is reused (an established thread keeps its own).
ThreadAttachmentManager
class ThreadAttachmentManager(AngeeManager)Owns the polymorphic edge from a model row to its chatter thread.
for_record
def for_record(record: Any, *, role: str = "chatter") -> Any | NoneReturn the existing thread attachment for record and role.
ensure_for_record
def ensure_for_record(record: Any,
*,
role: str = "chatter",
subject: str = "") -> AnyReturn record's attachment, creating its private chatter thread if needed.
Both the thread and the attachment are resolved with get_or_create on a deterministic key (the thread on its record:…:role external id, the attachment on the (content_type, object_id, role) unique constraint), so two concurrent first-posts converge on one row instead of the second raising an IntegrityError — select_for_update cannot lock a row that does not exist yet, so a lock-then-create cannot serialise the first insert.
teardown_for_record
def teardown_for_record(record: Any) -> NoneDelete every chatter thread attached to record and its whole subtree.
A record's chatter thread is private to that record, so a hard delete of the record collects the thread graph with it — no orphaned thread survives to be mis-resolved when a later row reuses the primary key. Deleting each Thread cascades its attachments, followers, activities, notifications, and participants; its messages FK the thread with SET_NULL (an ingested email message outlives a merged thread), so a private record thread's messages are deleted explicitly first.
ThreadFollowerQuerySet
class ThreadFollowerQuerySet(AngeeQuerySet[Any])Chainable read scopes for record chatter followers.
for_attachment
def for_attachment(attachment: Any) -> ThreadFollowerQuerySetReturn followers bound to one record's chatter attachment edge.
ThreadFollowerManager
class ThreadFollowerManager(AngeeManager.from_queryset(ThreadFollowerQuerySet)
)Owns user subscriptions to model-attached chatter threads.
for_record
def for_record(record: Any, *, role: str = "chatter") -> AnyReturn followers for record and role.
is_following
def is_following(record: Any,
*,
user: Any = None,
user_id: Any = None,
role: str = "chatter") -> boolReturn whether user follows record's chatter thread.
subscribe
def subscribe(
record: Any,
*,
user: Any = None,
user_id: Any = None,
role: str = "chatter",
notification_policy: str = "inbox",
subtype_keys: tuple[str, ...] = ()) -> AnyEnsure user follows record's chatter thread.
unsubscribe
def unsubscribe(record: Any,
*,
user: Any = None,
user_id: Any = None,
role: str = "chatter") -> intRemove user from record's chatter followers.
ThreadNotificationQuerySet
class ThreadNotificationQuerySet(AngeeQuerySet[Any])Chainable read scopes for per-recipient notification/read state.
DELIVERY_ERROR_STATUSES
Notification statuses that mean the author has a delivery error.
for_attachment
def for_attachment(attachment: Any) -> ThreadNotificationQuerySetReturn notifications bound to one record's chatter attachment edge.
unread
def unread() -> ThreadNotificationQuerySetReturn notifications the recipient has not yet read.
delivery_errors
def delivery_errors() -> ThreadNotificationQuerySetReturn notifications whose delivery bounced or raised an exception.
ThreadNotificationManager
class ThreadNotificationManager(
AngeeManager.from_queryset(ThreadNotificationQuerySet))Owns per-recipient notification/read state for record chatter messages.
for_record
def for_record(record: Any,
*,
user: Any = None,
user_id: Any = None,
role: str = "chatter",
unread_only: bool = False) -> AnyReturn notifications for user on record and role.
unread_count_for_record
def unread_count_for_record(record: Any,
*,
user: Any = None,
user_id: Any = None,
role: str = "chatter") -> intReturn the unread notification count for user on record.
mark_read_for_record
def mark_read_for_record(record: Any,
*,
user: Any = None,
user_id: Any = None,
role: str = "chatter") -> intMark user's unread notifications on record as read.
needaction_for_message
def needaction_for_message(message: Any,
*,
user: Any = None,
user_id: Any = None) -> boolReturn whether message has an unread notification for user.
mark_read_for_message
def mark_read_for_message(message: Any,
*,
user: Any = None,
user_id: Any = None) -> intMark one message's unread notification for user as read.
error_count_for_record
def error_count_for_record(record: Any,
*,
user: Any = None,
user_id: Any = None,
role: str = "chatter") -> intReturn the delivery-error count authored by user on record.
mark_failed
def mark_failed(notification: Any,
*,
status: str = "exception",
failure_type: str = "unknown",
failure_reason: str = "") -> AnyMark one notification as a delivery failure.
mark_failed_for_message
def mark_failed_for_message(message: Any,
*,
user: Any = None,
user_id: Any = None,
status: str = "exception",
failure_type: str = "unknown",
failure_reason: str = "") -> AnyMark one message notification for user as failed.
fanout_for_message
def fanout_for_message(
message: Any,
*,
attachment: Any | None = None,
subtype_key: str = "",
owner_id: Any = None,
recipient_user_ids: tuple[Any, ...] = ()) -> intCreate follower and explicit-recipient notifications for one message.
ThreadActivityQuerySet
class ThreadActivityQuerySet(AngeeQuerySet[Any])Chainable read scopes for scheduled chatter activities.
open
def open() -> ThreadActivityQuerySetReturn activities still to do (not yet done or cancelled).
ThreadActivityManager
class ThreadActivityManager(AngeeManager.from_queryset(ThreadActivityQuerySet)
)Owns scheduled activities attached to model chatter threads.
for_record
def for_record(record: Any,
*,
role: str = "chatter",
include_done: bool = True) -> AnyReturn activities for record and role.
schedule
def schedule(record: Any,
*,
user: Any = None,
user_id: Any = None,
role: str = "chatter",
summary: str,
note: str = "",
due_date: Any = None,
activity_type: str = "todo",
metadata: dict[str, Any] | None = None) -> AnyCreate a scheduled activity for record.
complete
def complete(activity: Any,
*,
feedback: str = "",
post_message: bool = True) -> AnyMark an activity done and optionally log that completion to the thread.
cancel
def cancel(activity: Any) -> AnyCancel an activity without posting a completion message.
MessageStarManager
class MessageStarManager(AngeeManager)Owns per-user Odoo-style starred message state.
is_starred
def is_starred(message: Any, *, user: Any | None) -> boolReturn whether user has starred message.
set_starred
def set_starred(message: Any,
*,
user: Any,
starred: bool | None = None) -> boolSet or toggle user's star on message and return the new state.
unstar_all
def unstar_all(*, user: Any) -> intRemove all stars owned by user.
ReactionManager
class ReactionManager(AngeeManager)Owns the attributed-reaction write — the row shape for a (message, handle, reaction).
MessageManager.set_reaction is the user-keyed chatter toggle; this is the distinct attributed write the social feed overlay lands for each external reactor. The row shape (fields + created_by default) lives here with the table owner so a producer batches through this owner instead of hand-rolling its own get_or_create.
attribute
def attribute(reactions: Any, *, owner_id: Any = None) -> intLand attributed reactions in one insert; return how many rows were built.
reactions is an iterable of (message, handle, reaction) triples. One bulk_create inserts the batch, idempotent on the partial unique (message, handle, reaction) constraint via ignore_conflicts — so a re-sync re-landing the same reactions is a no-op — and every row carries the cleaned reaction content and the created_by (the field-backed REBAC owner).
MessageQuerySet
class MessageQuerySet(AngeeQuerySet[Any])Chainable read scopes for chatter/ingest messages.
for_thread
def for_thread(thread: Any) -> MessageQuerySetReturn messages belonging to one thread.
visible_in_chatter
def visible_in_chatter() -> MessageQuerySetReturn messages a record's chatter feed shows (drop user notifications).
searching
def searching(term: str) -> MessageQuerySetReturn messages matching one Odoo-style chatter search token.
MessageManager
class MessageManager(AngeeManager.from_queryset(MessageQuerySet))Owns the message ingest write path (idempotent, null-safe, F()-counted).
for_record
def for_record(record: Any,
*,
role: str = "chatter",
search: str = "",
limit: int = 50,
before: Any | None = None,
after: Any | None = None,
around: Any | None = None) -> tuple[list[Any], int]Return fetched chatter messages for a record, optionally search-filtered.
post_to_thread
def post_to_thread(
thread: Any,
*,
body: str,
subject: str = "",
owner_id: Any = None,
attachment: Any | None = None,
attachments: tuple[Any, ...] = (),
message_type: Message.MessageKind | None = None,
subtype_key: str = "comment",
subtype_model_label: str = "",
parent: Any | None = None,
tracking_values: tuple[TrackingChange | dict[str, Any], ...] = (),
recipient_user_ids: tuple[Any, ...] = ()
) -> AnyCreate an internal user-authored message in thread and bump thread counters.
message_type defaults to :attr:Message.MessageKind.COMMENT; the enum is the single source of truth for the stored kind, so None resolves to it here.
set_reaction
def set_reaction(message: Any,
*,
reaction: str,
action: str = "toggle",
user: Any) -> AnyAdd, remove, or toggle the current user's reaction on message.
update_content
def update_content(message: Any, *, body: str, owner_id: Any = None) -> AnyUpdate a user-authored comment body, preserving Odoo's edit guardrails.
unlink_from_thread
def unlink_from_thread(message: Any, *, thread: Any) -> AnyDelete message from thread and repair thread denormalisations.
ingest
def ingest(parsed_messages: list[ParsedMessage],
*,
channel: Any,
owner_id: Any = None,
modality: Any = None,
visibility: Any = None,
message_kind: Any = None,
quote_edges: bool = True) -> list[Any]Upsert each parsed message into a thread with its parts/participants/edges.
Returns the landed :class:~angee.messaging.models.Message rows (a caller wanting the count takes len(...)) so an overlay — a public-feed engagement pass — reuses the rows this write already resolved instead of re-querying them by (platform, external_id).
Idempotent on (platform, external_id); null bytes stripped; thread counters bumped with F(). modality/visibility land each resolved thread under a non-email :class:~angee.messaging.models.Thread.Modality / :class:~angee.messaging.models.Thread.Visibility (a public feed passes PUBLIC_THREAD/PUBLIC); each defaults to the private email-thread shape. message_kind is the :class:~angee.messaging.models.Message.MessageKind each message lands under (this manager owns writing the column) — it defaults to EMAIL, and a public-feed producer passes COMMENT so a public post is not mislabelled email. quote_edges runs the RFC-5322 quotation builder — email's shared-fragment graph — and defaults on; a non-email producer whose short shared text would otherwise mint spurious quote edges passes quote_edges=False.
PartQuerySet
class PartQuerySet(AngeeQuerySet[Any])Chainable read scopes for message body parts.
attachments
def attachments() -> PartQuerySetReturn parts that carry a stored file — a message's attachment parts.
PartManager
class PartManager(AngeeManager.from_queryset(PartQuerySet))Owns the recursive body-part rows; reads compose the PartQuerySet scopes.
MessageEdgeManager
class MessageEdgeManager(AngeeManager)Owns the cross-message graph — derived quote edges from shared fragments.
relate
def relate(src: Any,
dst: Any,
*,
kind: Any,
owner_id: Any,
fragment: Any = None,
confidence: float = 1.0) -> AnyWrite one typed edge from src to dst, idempotent on the (src, dst, kind) key.
The single edge-write entry point on the table owner: a social producer relating two messages (mention/crosspost/forward) writes through this one shape instead of its own get_or_create, and the batched quotation builder lands the same :meth:_edge_fields columns. src/dst/fragment accept a row or its id; returns the edge, creating it only when the (src, dst, kind) triple is new.
create_for_message
def create_for_message(message: Any) -> intWrite quote edges from message to others sharing a non-boilerplate fragment.
Skips fragments quoted by more than :data:_BOILERPLATE_CUTOFF messages; edge direction runs from the earlier message to the later one.