# # Copyright (c) 2011 Thomas Graf # """Module providing access to network links This module provides an interface to view configured network links, modify them and to add and delete virtual network links. The following is a basic example: import netlink.core as netlink import netlink.route.link as link sock = netlink.Socket() sock.connect(netlink.NETLINK_ROUTE) cache = link.LinkCache() # create new empty link cache cache.refill(sock) # fill cache with all configured links eth0 = cache['eth0'] # lookup link "eth0" print eth0 # print basic configuration The module contains the following public classes: - Link -- Represents a network link. Instances can be created directly via the constructor (empty link objects) or via the refill() method of a LinkCache. - LinkCache -- Derived from netlink.Cache, holds any number of network links (Link instances). Main purpose is to keep a local list of all network links configured in the kernel. The following public functions exist: - get_from_kernel(socket, name) """ from __future__ import absolute_import __version__ = "0.1" __all__ = [ "LinkCache", "Link", ] import socket from .. import core as netlink from .. import capi as core_capi from . import capi as capi from .links import inet as inet from .. import util as util # Link statistics definitions RX_PACKETS = 0 TX_PACKETS = 1 RX_BYTES = 2 TX_BYTES = 3 RX_ERRORS = 4 TX_ERRORS = 5 RX_DROPPED = 6 TX_DROPPED = 7 RX_COMPRESSED = 8 TX_COMPRESSED = 9 RX_FIFO_ERR = 10 TX_FIFO_ERR = 11 RX_LEN_ERR = 12 RX_OVER_ERR = 13 RX_CRC_ERR = 14 RX_FRAME_ERR = 15 RX_MISSED_ERR = 16 TX_ABORT_ERR = 17 TX_CARRIER_ERR = 18 TX_HBEAT_ERR = 19 TX_WIN_ERR = 20 COLLISIONS = 21 MULTICAST = 22 IP6_INPKTS = 23 IP6_INHDRERRORS = 24 IP6_INTOOBIGERRORS = 25 IP6_INNOROUTES = 26 IP6_INADDRERRORS = 27 IP6_INUNKNOWNPROTOS = 28 IP6_INTRUNCATEDPKTS = 29 IP6_INDISCARDS = 30 IP6_INDELIVERS = 31 IP6_OUTFORWDATAGRAMS = 32 IP6_OUTPKTS = 33 IP6_OUTDISCARDS = 34 IP6_OUTNOROUTES = 35 IP6_REASMTIMEOUT = 36 IP6_REASMREQDS = 37 IP6_REASMOKS = 38 IP6_REASMFAILS = 39 IP6_FRAGOKS = 40 IP6_FRAGFAILS = 41 IP6_FRAGCREATES = 42 IP6_INMCASTPKTS = 43 IP6_OUTMCASTPKTS = 44 IP6_INBCASTPKTS = 45 IP6_OUTBCASTPKTS = 46 IP6_INOCTETS = 47 IP6_OUTOCTETS = 48 IP6_INMCASTOCTETS = 49 IP6_OUTMCASTOCTETS = 50 IP6_INBCASTOCTETS = 51 IP6_OUTBCASTOCTETS = 52 ICMP6_INMSGS = 53 ICMP6_INERRORS = 54 ICMP6_OUTMSGS = 55 ICMP6_OUTERRORS = 56 class LinkCache(netlink.Cache): """Cache of network links""" def __init__(self, family=socket.AF_UNSPEC, cache=None): if not cache: cache = self._alloc_cache_name("route/link") self._info_module = None self._protocol = netlink.NETLINK_ROUTE self._nl_cache = cache self._set_arg1(family) def __getitem__(self, key): if type(key) is int: link = capi.rtnl_link_get(self._nl_cache, key) else: link = capi.rtnl_link_get_by_name(self._nl_cache, key) if link is None: raise KeyError() else: return Link.from_capi(link) @staticmethod def _new_object(obj): return Link(obj) def _new_cache(self, cache): return LinkCache(family=self.arg1, cache=cache) class Link(netlink.Object): """Network link""" def __init__(self, obj=None): netlink.Object.__init__(self, "route/link", "link", obj) self._rtnl_link = self._obj2type(self._nl_object) if self.type: self._module_lookup("netlink.route.links." + self.type) self.inet = inet.InetLink(self) self.af = {"inet": self.inet} def __enter__(self): return self def __exit__(self, exc_type, exc_value, tb): if exc_type is None: self.change() else: return False @classmethod def from_capi(cls, obj): return cls(capi.link2obj(obj)) @staticmethod def _obj2type(obj): return capi.obj2link(obj) def __cmp__(self, other): return self.ifindex - other.ifindex @staticmethod def _new_instance(obj): if not obj: raise ValueError() return Link(obj) @property @netlink.nlattr(type=int, immutable=True, fmt=util.num) def ifindex(self): """interface index""" return capi.rtnl_link_get_ifindex(self._rtnl_link) @ifindex.setter def ifindex(self, value): capi.rtnl_link_set_ifindex(self._rtnl_link, int(value)) # ifindex is immutable but we assume that if _orig does not # have an ifindex specified, it was meant to be given here if capi.rtnl_link_get_ifindex(self._orig) == 0: capi.rtnl_link_set_ifindex(self._orig, int(value)) @property @netlink.nlattr(type=str, fmt=util.bold) def name(self): """Name of link""" return capi.rtnl_link_get_name(self._rtnl_link) @name.setter def name(self, value): capi.rtnl_link_set_name(self._rtnl_link, value) # name is the secondary identifier, if _orig does not have # the name specified yet, assume it was meant to be specified # here. ifindex will always take priority, therefore if ifindex # is specified as well, this will be ignored automatically. if capi.rtnl_link_get_name(self._orig) is None: capi.rtnl_link_set_name(self._orig, value) @property @netlink.nlattr(type=str, fmt=util.string) def flags(self): """Flags Setting this property will *Not* reset flags to value you supply in Examples: link.flags = '+xxx' # add xxx flag link.flags = 'xxx' # exactly the same link.flags = '-xxx' # remove xxx flag link.flags = [ '+xxx', '-yyy' ] # list operation """ flags = capi.rtnl_link_get_flags(self._rtnl_link) return capi.rtnl_link_flags2str(flags, 256)[0].split(",") def _set_flag(self, flag): if flag.startswith("-"): i = capi.rtnl_link_str2flags(flag[1:]) capi.rtnl_link_unset_flags(self._rtnl_link, i) elif flag.startswith("+"): i = capi.rtnl_link_str2flags(flag[1:]) capi.rtnl_link_set_flags(self._rtnl_link, i) else: i = capi.rtnl_link_str2flags(flag) capi.rtnl_link_set_flags(self._rtnl_link, i) @flags.setter def flags(self, value): if not (type(value) is str): for flag in value: self._set_flag(flag) else: self._set_flag(value) @property @netlink.nlattr(type=int, fmt=util.num) def mtu(self): """Maximum Transmission Unit""" return capi.rtnl_link_get_mtu(self._rtnl_link) @mtu.setter def mtu(self, value): capi.rtnl_link_set_mtu(self._rtnl_link, int(value)) @property @netlink.nlattr(type=int, immutable=True, fmt=util.num) def family(self): """Address family""" return capi.rtnl_link_get_family(self._rtnl_link) @family.setter def family(self, value): capi.rtnl_link_set_family(self._rtnl_link, value) @property @netlink.nlattr(type=str, fmt=util.addr) def address(self): """Hardware address (MAC address)""" a = capi.rtnl_link_get_addr(self._rtnl_link) return netlink.AbstractAddress(a) @address.setter def address(self, value): capi.rtnl_link_set_addr(self._rtnl_link, value._addr) @property @netlink.nlattr(type=str, fmt=util.addr) def broadcast(self): """Hardware broadcast address""" a = capi.rtnl_link_get_broadcast(self._rtnl_link) return netlink.AbstractAddress(a) @broadcast.setter def broadcast(self, value): capi.rtnl_link_set_broadcast(self._rtnl_link, value._addr) @property @netlink.nlattr(type=str, immutable=True, fmt=util.string) def qdisc(self): """Name of qdisc (cannot be changed)""" return capi.rtnl_link_get_qdisc(self._rtnl_link) @qdisc.setter def qdisc(self, value): capi.rtnl_link_set_qdisc(self._rtnl_link, value) @property @netlink.nlattr(type=int, fmt=util.num) def txqlen(self): """Length of transmit queue""" return capi.rtnl_link_get_txqlen(self._rtnl_link) @txqlen.setter def txqlen(self, value): capi.rtnl_link_set_txqlen(self._rtnl_link, int(value)) @property @netlink.nlattr(type=str, immutable=True, fmt=util.string) def arptype(self): """Type of link (cannot be changed)""" type_ = capi.rtnl_link_get_arptype(self._rtnl_link) return core_capi.nl_llproto2str(type_, 64)[0] @arptype.setter def arptype(self, value): i = core_capi.nl_str2llproto(value) capi.rtnl_link_set_arptype(self._rtnl_link, i) @property @netlink.nlattr(type=str, immutable=True, fmt=util.string, title="state") def operstate(self): """Operational status""" operstate = capi.rtnl_link_get_operstate(self._rtnl_link) return capi.rtnl_link_operstate2str(operstate, 32)[0] @operstate.setter def operstate(self, value): i = capi.rtnl_link_str2operstate(value) capi.rtnl_link_set_operstate(self._rtnl_link, i) @property @netlink.nlattr(type=str, immutable=True, fmt=util.string) def mode(self): """Link mode""" mode = capi.rtnl_link_get_linkmode(self._rtnl_link) return capi.rtnl_link_mode2str(mode, 32)[0] @mode.setter def mode(self, value): i = capi.rtnl_link_str2mode(value) capi.rtnl_link_set_linkmode(self._rtnl_link, i) @property @netlink.nlattr(type=str, fmt=util.string) def alias(self): """Interface alias (SNMP)""" return capi.rtnl_link_get_ifalias(self._rtnl_link) @alias.setter def alias(self, value): capi.rtnl_link_set_ifalias(self._rtnl_link, value) @property @netlink.nlattr(type=str, fmt=util.string) def type(self): """Link type""" return capi.rtnl_link_get_type(self._rtnl_link) @type.setter def type(self, value): if capi.rtnl_link_set_type(self._rtnl_link, value) < 0: raise NameError("unknown info type") self._module_lookup("netlink.route.links." + value) def get_stat(self, stat): """Retrieve statistical information""" if type(stat) is str: stat = capi.rtnl_link_str2stat(stat) if stat < 0: raise NameError("unknown name of statistic") return capi.rtnl_link_get_stat(self._rtnl_link, stat) def enslave(self, slave, sock=None): if not sock: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) return capi.rtnl_link_enslave(sock._sock, self._rtnl_link, slave._rtnl_link) def release(self, slave, sock=None): if not sock: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) return capi.rtnl_link_release(sock._sock, self._rtnl_link, slave._rtnl_link) def add(self, sock=None, flags=None): if not sock: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) if not flags: flags = netlink.NLM_F_CREATE ret = capi.rtnl_link_add(sock._sock, self._rtnl_link, flags) if ret < 0: raise netlink.KernelError(ret) def change(self, sock=None, flags=0): """Commit changes made to the link object""" if sock is None: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) if not self._orig: raise netlink.NetlinkError("Original link not available") ret = capi.rtnl_link_change(sock._sock, self._orig, self._rtnl_link, flags) if ret < 0: raise netlink.KernelError(ret) def delete(self, sock=None): """Attempt to delete this link in the kernel""" if sock is None: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) ret = capi.rtnl_link_delete(sock._sock, self._rtnl_link) if ret < 0: raise netlink.KernelError(ret) ################################################################### # private properties # # Used for formatting output. USE AT OWN RISK @property def _state(self): if "up" in self.flags: buf = util.good("up") if "lowerup" not in self.flags: buf += " " + util.bad("no-carrier") else: buf = util.bad("down") return buf @property def _brief(self): return self._module_brief() + self._foreach_af("brief") @property def _flags(self): ignore = [ "up", "running", "lowerup", ] return ",".join([flag for flag in self.flags if flag not in ignore]) def _foreach_af(self, name, args=None): buf = "" for af in self.af: try: func = getattr(self.af[af], name) s = str(func(args)) if len(s) > 0: buf += " " + s except AttributeError: pass return buf def format(self, details=False, stats=False, indent=""): """Return link as formatted text""" fmt = util.MyFormatter(self, indent) buf = fmt.format( "{a|ifindex} {a|name} {a|arptype} {a|address} " "{a|_state} <{a|_flags}> {a|_brief}" ) if details: buf += fmt.nl("\t{t|mtu} {t|txqlen} {t|weight} " "{t|qdisc} {t|operstate}") buf += fmt.nl("\t{t|broadcast} {t|alias}") buf += self._foreach_af("details", fmt) if stats: lst = [ ["Packets", RX_PACKETS, TX_PACKETS], ["Bytes", RX_BYTES, TX_BYTES], ["Errors", RX_ERRORS, TX_ERRORS], ["Dropped", RX_DROPPED, TX_DROPPED], ["Compressed", RX_COMPRESSED, TX_COMPRESSED], ["FIFO Errors", RX_FIFO_ERR, TX_FIFO_ERR], ["Length Errors", RX_LEN_ERR, None], ["Over Errors", RX_OVER_ERR, None], ["CRC Errors", RX_CRC_ERR, None], ["Frame Errors", RX_FRAME_ERR, None], ["Missed Errors", RX_MISSED_ERR, None], ["Abort Errors", None, TX_ABORT_ERR], ["Carrier Errors", None, TX_CARRIER_ERR], ["Heartbeat Errors", None, TX_HBEAT_ERR], ["Window Errors", None, TX_WIN_ERR], ["Collisions", None, COLLISIONS], ["Multicast", None, MULTICAST], ["", None, None], ["Ipv6:", None, None], ["Packets", IP6_INPKTS, IP6_OUTPKTS], ["Bytes", IP6_INOCTETS, IP6_OUTOCTETS], ["Discards", IP6_INDISCARDS, IP6_OUTDISCARDS], ["Multicast Packets", IP6_INMCASTPKTS, IP6_OUTMCASTPKTS], ["Multicast Bytes", IP6_INMCASTOCTETS, IP6_OUTMCASTOCTETS], ["Broadcast Packets", IP6_INBCASTPKTS, IP6_OUTBCASTPKTS], ["Broadcast Bytes", IP6_INBCASTOCTETS, IP6_OUTBCASTOCTETS], ["Delivers", IP6_INDELIVERS, None], ["Forwarded", None, IP6_OUTFORWDATAGRAMS], ["No Routes", IP6_INNOROUTES, IP6_OUTNOROUTES], ["Header Errors", IP6_INHDRERRORS, None], ["Too Big Errors", IP6_INTOOBIGERRORS, None], ["Address Errors", IP6_INADDRERRORS, None], ["Unknown Protocol", IP6_INUNKNOWNPROTOS, None], ["Truncated Packets", IP6_INTRUNCATEDPKTS, None], ["Reasm Timeouts", IP6_REASMTIMEOUT, None], ["Reasm Requests", IP6_REASMREQDS, None], ["Reasm Failures", IP6_REASMFAILS, None], ["Reasm OK", IP6_REASMOKS, None], ["Frag Created", None, IP6_FRAGCREATES], ["Frag Failures", None, IP6_FRAGFAILS], ["Frag OK", None, IP6_FRAGOKS], ["", None, None], ["ICMPv6:", None, None], ["Messages", ICMP6_INMSGS, ICMP6_OUTMSGS], ["Errors", ICMP6_INERRORS, ICMP6_OUTERRORS], ] buf += "\n\t%s%s%s%s\n" % ( 33 * " ", util.title("RX"), 15 * " ", util.title("TX"), ) for row in lst: row[0] = util.kw(row[0]) row[1] = self.get_stat(row[1]) if row[1] else "" row[2] = self.get_stat(row[2]) if row[2] else "" buf += "\t{0[0]:27} {0[1]:>16} {0[2]:>16}\n".format(row) buf += self._foreach_af("stats") return buf def get(name, sock=None): """Lookup Link object directly from kernel""" if not name: raise ValueError() if not sock: sock = netlink.lookup_socket(netlink.NETLINK_ROUTE) link = capi.get_from_kernel(sock._sock, 0, name) if not link: return None return Link.from_capi(link) _link_cache = LinkCache() def resolve(name): _link_cache.refill() return _link_cache[name]