Skip to content

angee.messaging.managers

Managers that own the messaging write path — the channel-sync ingest.

A channel backend parses a source into neutral ParsedMessage rows; these managers turn each into a :class:~angee.messaging.models.Message with its thread, its recursive :class:~angee.messaging.models.Part tree (text content-addressed into :class:~angee.messaging.models.Fragment\s), its participants, and its quotation edges. They encode the invariants a high-volume email sync depends on:

  • (platform, external_id) update_or_create keys make re-sync idempotent.
  • null bytes (\x00) are stripped before every write (Postgres rejects them).
  • thread resolution is the 4-step RFC-5322 priority under select_for_update.
  • denormalised counters bump with F(), never read-modify-write.
  • the quotation graph FK-joins on shared fragments, skipping boilerplate quoted by more than :data:_BOILERPLATE_CUTOFF messages.

The sync runs under system_context; created_by is set to the channel owner.

strip_null_bytes

python
def strip_null_bytes(value: Any) -> Any

Recursively remove \x00 from strings inside str/dict/list values.

Email bodies routinely contain null bytes, which Postgres rejects in text/JSON columns; stripping them on the write path keeps a large sync from hard-failing.

normalize_subject

python
def normalize_subject(subject: str) -> str

Strip repeated Re:/Fwd:/… prefixes and collapse whitespace for matching.

message_subtype_options

python
def message_subtype_options(
        model_label: str = "") -> tuple[dict[str, Any], ...]

Return follower-selectable subtype options for model_label.

The option list is deterministic even before any message has created subtype rows. Existing global/model rows override labels and flags for their key.

FragmentManager

python
class FragmentManager(AngeeManager)

Content-addressed text store: one row per distinct (null-stripped) text.

upsert

python
def upsert(*, text: str, kind: str = "paragraph", owner_id: Any = None) -> Any

Get-or-create a fragment by the SHA-256 of its cleaned (null-stripped, trimmed) text.

ThreadManager

python
class ThreadManager(AngeeManager)

Owns thread resolution — the 4-step RFC-5322 priority under a row lock.

resolve

python
def resolve(*,
            platform: str,
            channel: Any,
            subject: str = "",
            in_reply_to: str = "",
            references: tuple[str, ...] = (),
            message_external_id: str = "",
            owner_id: Any = None,
            modality: Any = None,
            visibility: Any = None) -> Any

Resolve the thread a message belongs to, creating one if needed.

Priority: In-Reply-ToReferences (newest-first, i.e. right-to-left, resolved in one batch query) → normalised subject → a new thread. The subject match and the create run under select_for_update on a deterministic external id (subj:<normalized> or msg:<id>) so two concurrent batches resolving the same subject collide on the unique constraint and converge to one thread instead of double-creating.

A message with no threading hint and no subject keys on msg:<external_id> — its own one-message thread, never merged with another, because there is no key to merge on (collapsing keyless messages would fuse unrelated mail). The inbox groups such threads individually; they read as standalone conversations.

modality/visibility land a newly created thread under a non-email :class:~angee.messaging.models.Thread.Modality / :class:~angee.messaging.models.Thread.Visibility — a public feed passes PUBLIC_THREAD/PUBLIC so the row is born public instead of being bulk-updated afterward. Each defaults to the private email-thread shape and is ignored when an existing thread is reused (an established thread keeps its own).

ThreadAttachmentManager

python
class ThreadAttachmentManager(AngeeManager)

Owns the polymorphic edge from a model row to its chatter thread.

for_record

python
def for_record(record: Any, *, role: str = "chatter") -> Any | None

Return the existing thread attachment for record and role.

ensure_for_record

python
def ensure_for_record(record: Any,
                      *,
                      role: str = "chatter",
                      subject: str = "") -> Any

Return record's attachment, creating its private chatter thread if needed.

Both the thread and the attachment are resolved with get_or_create on a deterministic key (the thread on its record:…:role external id, the attachment on the (content_type, object_id, role) unique constraint), so two concurrent first-posts converge on one row instead of the second raising an IntegrityErrorselect_for_update cannot lock a row that does not exist yet, so a lock-then-create cannot serialise the first insert.

teardown_for_record

python
def teardown_for_record(record: Any) -> None

Delete every chatter thread attached to record and its whole subtree.

A record's chatter thread is private to that record, so a hard delete of the record collects the thread graph with it — no orphaned thread survives to be mis-resolved when a later row reuses the primary key. Deleting each Thread cascades its attachments, followers, activities, notifications, and participants; its messages FK the thread with SET_NULL (an ingested email message outlives a merged thread), so a private record thread's messages are deleted explicitly first.

ThreadFollowerQuerySet

python
class ThreadFollowerQuerySet(AngeeQuerySet[Any])

Chainable read scopes for record chatter followers.

for_attachment

python
def for_attachment(attachment: Any) -> ThreadFollowerQuerySet

Return followers bound to one record's chatter attachment edge.

ThreadFollowerManager

python
class ThreadFollowerManager(AngeeManager.from_queryset(ThreadFollowerQuerySet)
                            )

Owns user subscriptions to model-attached chatter threads.

for_record

python
def for_record(record: Any, *, role: str = "chatter") -> Any

Return followers for record and role.

is_following

python
def is_following(record: Any,
                 *,
                 user: Any = None,
                 user_id: Any = None,
                 role: str = "chatter") -> bool

Return whether user follows record's chatter thread.

subscribe

python
def subscribe(
    record: Any,
    *,
    user: Any = None,
    user_id: Any = None,
    role: str = "chatter",
    notification_policy: str = "inbox",
    subtype_keys: tuple[str, ...] = ()) -> Any

Ensure user follows record's chatter thread.

unsubscribe

python
def unsubscribe(record: Any,
                *,
                user: Any = None,
                user_id: Any = None,
                role: str = "chatter") -> int

Remove user from record's chatter followers.

ThreadNotificationQuerySet

python
class ThreadNotificationQuerySet(AngeeQuerySet[Any])

Chainable read scopes for per-recipient notification/read state.

DELIVERY_ERROR_STATUSES

Notification statuses that mean the author has a delivery error.

for_attachment

python
def for_attachment(attachment: Any) -> ThreadNotificationQuerySet

Return notifications bound to one record's chatter attachment edge.

unread

python
def unread() -> ThreadNotificationQuerySet

Return notifications the recipient has not yet read.

delivery_errors

python
def delivery_errors() -> ThreadNotificationQuerySet

Return notifications whose delivery bounced or raised an exception.

ThreadNotificationManager

python
class ThreadNotificationManager(
        AngeeManager.from_queryset(ThreadNotificationQuerySet))

Owns per-recipient notification/read state for record chatter messages.

for_record

python
def for_record(record: Any,
               *,
               user: Any = None,
               user_id: Any = None,
               role: str = "chatter",
               unread_only: bool = False) -> Any

Return notifications for user on record and role.

unread_count_for_record

python
def unread_count_for_record(record: Any,
                            *,
                            user: Any = None,
                            user_id: Any = None,
                            role: str = "chatter") -> int

Return the unread notification count for user on record.

mark_read_for_record

python
def mark_read_for_record(record: Any,
                         *,
                         user: Any = None,
                         user_id: Any = None,
                         role: str = "chatter") -> int

Mark user's unread notifications on record as read.

needaction_for_message

python
def needaction_for_message(message: Any,
                           *,
                           user: Any = None,
                           user_id: Any = None) -> bool

Return whether message has an unread notification for user.

mark_read_for_message

python
def mark_read_for_message(message: Any,
                          *,
                          user: Any = None,
                          user_id: Any = None) -> int

Mark one message's unread notification for user as read.

error_count_for_record

python
def error_count_for_record(record: Any,
                           *,
                           user: Any = None,
                           user_id: Any = None,
                           role: str = "chatter") -> int

Return the delivery-error count authored by user on record.

mark_failed

python
def mark_failed(notification: Any,
                *,
                status: str = "exception",
                failure_type: str = "unknown",
                failure_reason: str = "") -> Any

Mark one notification as a delivery failure.

mark_failed_for_message

python
def mark_failed_for_message(message: Any,
                            *,
                            user: Any = None,
                            user_id: Any = None,
                            status: str = "exception",
                            failure_type: str = "unknown",
                            failure_reason: str = "") -> Any

Mark one message notification for user as failed.

fanout_for_message

python
def fanout_for_message(
    message: Any,
    *,
    attachment: Any | None = None,
    subtype_key: str = "",
    owner_id: Any = None,
    recipient_user_ids: tuple[Any, ...] = ()) -> int

Create follower and explicit-recipient notifications for one message.

ThreadActivityQuerySet

python
class ThreadActivityQuerySet(AngeeQuerySet[Any])

Chainable read scopes for scheduled chatter activities.

open

python
def open() -> ThreadActivityQuerySet

Return activities still to do (not yet done or cancelled).

ThreadActivityManager

python
class ThreadActivityManager(AngeeManager.from_queryset(ThreadActivityQuerySet)
                            )

Owns scheduled activities attached to model chatter threads.

for_record

python
def for_record(record: Any,
               *,
               role: str = "chatter",
               include_done: bool = True) -> Any

Return activities for record and role.

schedule

python
def schedule(record: Any,
             *,
             user: Any = None,
             user_id: Any = None,
             role: str = "chatter",
             summary: str,
             note: str = "",
             due_date: Any = None,
             activity_type: str = "todo",
             metadata: dict[str, Any] | None = None) -> Any

Create a scheduled activity for record.

complete

python
def complete(activity: Any,
             *,
             feedback: str = "",
             post_message: bool = True) -> Any

Mark an activity done and optionally log that completion to the thread.

cancel

python
def cancel(activity: Any) -> Any

Cancel an activity without posting a completion message.

MessageStarManager

python
class MessageStarManager(AngeeManager)

Owns per-user Odoo-style starred message state.

is_starred

python
def is_starred(message: Any, *, user: Any | None) -> bool

Return whether user has starred message.

set_starred

python
def set_starred(message: Any,
                *,
                user: Any,
                starred: bool | None = None) -> bool

Set or toggle user's star on message and return the new state.

unstar_all

python
def unstar_all(*, user: Any) -> int

Remove all stars owned by user.

ReactionManager

python
class ReactionManager(AngeeManager)

Owns the attributed-reaction write — the row shape for a (message, handle, reaction).

MessageManager.set_reaction is the user-keyed chatter toggle; this is the distinct attributed write the social feed overlay lands for each external reactor. The row shape (fields + created_by default) lives here with the table owner so a producer batches through this owner instead of hand-rolling its own get_or_create.

attribute

python
def attribute(reactions: Any, *, owner_id: Any = None) -> int

Land attributed reactions in one insert; return how many rows were built.

reactions is an iterable of (message, handle, reaction) triples. One bulk_create inserts the batch, idempotent on the partial unique (message, handle, reaction) constraint via ignore_conflicts — so a re-sync re-landing the same reactions is a no-op — and every row carries the cleaned reaction content and the created_by (the field-backed REBAC owner).

MessageQuerySet

python
class MessageQuerySet(AngeeQuerySet[Any])

Chainable read scopes for chatter/ingest messages.

for_thread

python
def for_thread(thread: Any) -> MessageQuerySet

Return messages belonging to one thread.

visible_in_chatter

python
def visible_in_chatter() -> MessageQuerySet

Return messages a record's chatter feed shows (drop user notifications).

searching

python
def searching(term: str) -> MessageQuerySet

Return messages matching one Odoo-style chatter search token.

MessageManager

python
class MessageManager(AngeeManager.from_queryset(MessageQuerySet))

Owns the message ingest write path (idempotent, null-safe, F()-counted).

for_record

python
def for_record(record: Any,
               *,
               role: str = "chatter",
               search: str = "",
               limit: int = 50,
               before: Any | None = None,
               after: Any | None = None,
               around: Any | None = None) -> tuple[list[Any], int]

Return fetched chatter messages for a record, optionally search-filtered.

post_to_thread

python
def post_to_thread(
    thread: Any,
    *,
    body: str,
    subject: str = "",
    owner_id: Any = None,
    attachment: Any | None = None,
    attachments: tuple[Any, ...] = (),
    message_type: Message.MessageKind | None = None,
    subtype_key: str = "comment",
    subtype_model_label: str = "",
    parent: Any | None = None,
    tracking_values: tuple[TrackingChange | dict[str, Any], ...] = (),
    recipient_user_ids: tuple[Any, ...] = ()
) -> Any

Create an internal user-authored message in thread and bump thread counters.

message_type defaults to :attr:Message.MessageKind.COMMENT; the enum is the single source of truth for the stored kind, so None resolves to it here.

set_reaction

python
def set_reaction(message: Any,
                 *,
                 reaction: str,
                 action: str = "toggle",
                 user: Any) -> Any

Add, remove, or toggle the current user's reaction on message.

update_content

python
def update_content(message: Any, *, body: str, owner_id: Any = None) -> Any

Update a user-authored comment body, preserving Odoo's edit guardrails.

python
def unlink_from_thread(message: Any, *, thread: Any) -> Any

Delete message from thread and repair thread denormalisations.

ingest

python
def ingest(parsed_messages: list[ParsedMessage],
           *,
           channel: Any,
           owner_id: Any = None,
           modality: Any = None,
           visibility: Any = None,
           message_kind: Any = None,
           quote_edges: bool = True) -> list[Any]

Upsert each parsed message into a thread with its parts/participants/edges.

Returns the landed :class:~angee.messaging.models.Message rows (a caller wanting the count takes len(...)) so an overlay — a public-feed engagement pass — reuses the rows this write already resolved instead of re-querying them by (platform, external_id).

Idempotent on (platform, external_id); null bytes stripped; thread counters bumped with F(). modality/visibility land each resolved thread under a non-email :class:~angee.messaging.models.Thread.Modality / :class:~angee.messaging.models.Thread.Visibility (a public feed passes PUBLIC_THREAD/PUBLIC); each defaults to the private email-thread shape. message_kind is the :class:~angee.messaging.models.Message.MessageKind each message lands under (this manager owns writing the column) — it defaults to EMAIL, and a public-feed producer passes COMMENT so a public post is not mislabelled email. quote_edges runs the RFC-5322 quotation builder — email's shared-fragment graph — and defaults on; a non-email producer whose short shared text would otherwise mint spurious quote edges passes quote_edges=False.

PartQuerySet

python
class PartQuerySet(AngeeQuerySet[Any])

Chainable read scopes for message body parts.

attachments

python
def attachments() -> PartQuerySet

Return parts that carry a stored file — a message's attachment parts.

PartManager

python
class PartManager(AngeeManager.from_queryset(PartQuerySet))

Owns the recursive body-part rows; reads compose the PartQuerySet scopes.

MessageEdgeManager

python
class MessageEdgeManager(AngeeManager)

Owns the cross-message graph — derived quote edges from shared fragments.

relate

python
def relate(src: Any,
           dst: Any,
           *,
           kind: Any,
           owner_id: Any,
           fragment: Any = None,
           confidence: float = 1.0) -> Any

Write one typed edge from src to dst, idempotent on the (src, dst, kind) key.

The single edge-write entry point on the table owner: a social producer relating two messages (mention/crosspost/forward) writes through this one shape instead of its own get_or_create, and the batched quotation builder lands the same :meth:_edge_fields columns. src/dst/fragment accept a row or its id; returns the edge, creating it only when the (src, dst, kind) triple is new.

create_for_message

python
def create_for_message(message: Any) -> int

Write quote edges from message to others sharing a non-boilerplate fragment.

Skips fragments quoted by more than :data:_BOILERPLATE_CUTOFF messages; edge direction runs from the earlier message to the later one.

Released under the AGPL-3.0 License.