ros_acomms_net_db subpackage#
Submodules#
ros_acomms_net_db.index_list module#
- ros_acomms_net_db.index_list.compact(index_list)#
- ros_acomms_net_db.index_list.get_intersection_indices(list_a, list_b)#
- ros_acomms_net_db.index_list.get_touching_block(list_of_blocks, segment_to_touch)#
- ros_acomms_net_db.index_list.invert_index_list(index_list: list, limits: list) list #
ros_acomms_net_db.net_database module#
net_database#
Database model definitions for ros_acomms_net
- class ros_acomms_net_db.net_database.AppResponse(*args, **kwargs)#
Bases:
BaseModel
- DoesNotExist#
alias of
AppResponseDoesNotExist
- encoded_packet = <BlobField: AppResponse.encoded_packet>#
- encoded_packet_size_in_bits = <IntegerField: AppResponse.encoded_packet_size_in_bits>#
- id = <AutoField: AppResponse.id>#
- net_message = <ForeignKeyField: AppResponse.net_message>#
- net_message_id = <ForeignKeyField: AppResponse.net_message>#
- payload = <BlobField: AppResponse.payload>#
- payload_size_in_bits = <IntegerField: AppResponse.payload_size_in_bits>#
- pending_transmit = <BooleanField: AppResponse.pending_transmit>#
- tags#
- class ros_acomms_net_db.net_database.AppResponseTag(*args, **kwargs)#
Bases:
BaseModel
- DoesNotExist#
alias of
AppResponseTagDoesNotExist
- app_response = <ForeignKeyField: AppResponseTag.app_response>#
- app_response_id = <ForeignKeyField: AppResponseTag.app_response>#
- id = <AutoField: AppResponseTag.id>#
- tag = <CharField: AppResponseTag.tag>#
- class ros_acomms_net_db.net_database.BaseModel(*args, **kwargs)#
Bases:
Model
base model for all database models
- DoesNotExist#
alias of
BaseModelDoesNotExist
- id = <AutoField: BaseModel.id>#
- class ros_acomms_net_db.net_database.LinkPath(*args, **kwargs)#
Bases:
BaseModel
- DoesNotExist#
alias of
LinkPathDoesNotExist
- id = <AutoField: LinkPath.id>#
- modem = <CharField: LinkPath.modem>#
- net_destination = <IntegerField: LinkPath.net_destination>#
- net_message = <ForeignKeyField: LinkPath.net_message>#
- net_message_id = <ForeignKeyField: LinkPath.net_message>#
- net_source = <IntegerField: LinkPath.net_source>#
- rx_time = <DateTimeField: LinkPath.rx_time>#
- class ros_acomms_net_db.net_database.MessageTag(*args, **kwargs)#
Bases:
BaseModel
- DoesNotExist#
alias of
MessageTagDoesNotExist
- id = <AutoField: MessageTag.id>#
- net_message = <ForeignKeyField: MessageTag.net_message>#
- net_message_id = <ForeignKeyField: MessageTag.net_message>#
- tag = <CharField: MessageTag.tag>#
- class ros_acomms_net_db.net_database.NetMessage(*args, **kwargs)#
Bases:
BaseModel
- DoesNotExist#
alias of
NetMessageDoesNotExist
- ack_type = <IntegerField: NetMessage.ack_type>#
- all_fragments_acked_up_to(ack_type: int)#
- all_fragments_at_state(state: int)#
- property all_fragments_available#
- property all_fragments_endpoint_acked#
- property all_fragments_hop_acked#
- property all_fragments_sent#
- app_active = <BooleanField: NetMessage.app_active>#
- app_responses#
- append_received_fragment(start_idx: int, length_in_blocks: int, fragment_payload: bytes)#
- application_state = <IntegerField: NetMessage.application_state>#
- block_size_in_bits = <IntegerField: NetMessage.block_size_in_bits>#
- comms_active = <BooleanField: NetMessage.comms_active>#
- comms_state = <IntegerField: NetMessage.comms_state>#
- created_at = <DateTimeField: NetMessage.created_at>#
- encoded_packet = <BlobField: NetMessage.encoded_packet>#
- encoded_packet_size_in_bits = <IntegerField: NetMessage.encoded_packet_size_in_bits>#
- fragmentation_allowed = <BooleanField: NetMessage.fragmentation_allowed>#
- fragments_status#
- get_available_fragment_list() list #
- get_fragment_status_list(status: int) list #
- get_inverse_indices(ack_type: int) -> (<class 'int'>, <class 'int'>)#
- get_next_fragment(max_size_bits: int, ack_type: int) Tuple[int, int, bytes] #
- get_num_blocks_in_state(fragment_state: int | None)#
- get_pending_ack_fragment_list(ack_type: int | None = None) list #
- get_queued_ack_fragment_list(ack_type: int) list #
- id = <AutoField: NetMessage.id>#
- is_delivered_to_me = <BooleanField: NetMessage.is_delivered_to_me>#
- is_fragmented = <BooleanField: NetMessage.is_fragmented>#
- link_path#
- link_priority = <IntegerField: NetMessage.link_priority>#
- mark_fragment_acked(start_idx: int, fragment_size_in_blocks: int, ack_type)#
- mark_fragment_pending_send(start_idx: int, end_idx: int)#
- mark_fragment_state(start_idx: int, end_idx: int, fragment_state: int)#
- mark_full_payload_available()#
- mark_pending_fragments_as_sent()#
- net_destination = <IntegerField: NetMessage.net_destination>#
- net_priority = <IntegerField: NetMessage.net_priority>#
- net_source = <IntegerField: NetMessage.net_source>#
- num_acks_forwarded = <IntegerField: NetMessage.num_acks_forwarded>#
- num_app_responses_forwarded = <IntegerField: NetMessage.num_app_responses_forwarded>#
- property num_available_blocks#
- originator_uuid = <CharField: NetMessage.originator_uuid>#
- payload = <BlobField: NetMessage.payload>#
- payload_size_in_bits = <IntegerField: NetMessage.payload_size_in_bits>#
- property payload_size_in_blocks#
- pending_acks#
- send_an_ack = <BooleanField: NetMessage.send_an_ack>#
- tags#
- trace_link_path = <BooleanField: NetMessage.trace_link_path>#
- transaction_id = <IntegerField: NetMessage.transaction_id>#
- transmit_events#
- tx_tries_remaining = <IntegerField: NetMessage.tx_tries_remaining>#
- unknown_content = <BooleanField: NetMessage.unknown_content>#
- update_queued_ack_fragments(start_idx: int, length_in_blocks: int, ack_type: int | None)#
- class ros_acomms_net_db.net_database.NetMessageFragmentStatus(*args, **kwargs)#
Bases:
BaseModel
- DoesNotExist#
alias of
NetMessageFragmentStatusDoesNotExist
- fragment_end_index = <IntegerField: NetMessageFragmentStatus.fragment_end_index>#
- fragment_start_index = <IntegerField: NetMessageFragmentStatus.fragment_start_index>#
- fragment_state = <IntegerField: NetMessageFragmentStatus.fragment_state>#
- id = <AutoField: NetMessageFragmentStatus.id>#
- net_message = <ForeignKeyField: NetMessageFragmentStatus.net_message>#
- net_message_id = <ForeignKeyField: NetMessageFragmentStatus.net_message>#
- class ros_acomms_net_db.net_database.PendingAckToTransmit(*args, **kwargs)#
Bases:
BaseModel
- DoesNotExist#
alias of
PendingAckToTransmitDoesNotExist
- ack_type = <IntegerField: PendingAckToTransmit.ack_type>#
- encoded_packet = <BlobField: PendingAckToTransmit.encoded_packet>#
- encoded_packet_size_in_bits = <IntegerField: PendingAckToTransmit.encoded_packet_size_in_bits>#
- end_index = <IntegerField: PendingAckToTransmit.end_index>#
- id = <AutoField: PendingAckToTransmit.id>#
- is_fragment = <BooleanField: PendingAckToTransmit.is_fragment>#
- net_message = <ForeignKeyField: PendingAckToTransmit.net_message>#
- net_message_id = <ForeignKeyField: PendingAckToTransmit.net_message>#
- property size_in_blocks#
- start_index = <IntegerField: PendingAckToTransmit.start_index>#
- updated_at = <DateTimeField: PendingAckToTransmit.updated_at>#
- class ros_acomms_net_db.net_database.TransmitEvent(*args, **kwargs)#
Bases:
BaseModel
- DoesNotExist#
alias of
TransmitEventDoesNotExist
- id = <AutoField: TransmitEvent.id>#
- interface = <CharField: TransmitEvent.interface>#
- net_message = <ForeignKeyField: TransmitEvent.net_message>#
- net_message_id = <ForeignKeyField: TransmitEvent.net_message>#
- transmit_time = <DateTimeField: TransmitEvent.transmit_time>#
- ros_acomms_net_db.net_database.on_save_handler(model_class, instance, created)#
ros_acomms_net_db.net_database_handler module#
net_database#
Database setup
- class ros_acomms_net_db.net_database_handler.NetDatabaseHandler(init_db: bool = False)#
Bases:
object
Class to handle database setup and reset
- models = [<Model: NetMessage>, <Model: PendingAckToTransmit>, <Model: LinkPath>, <Model: AppResponse>, <Model: NetMessageFragmentStatus>, <Model: TransmitEvent>, <Model: MessageTag>, <Model: AppResponseTag>]#
- reset_db()#
Function to reset the database